Name | Total Lines | Lines of Code | Total Coverage | Code Coverage |
---|---|---|---|---|
lib/asir/transport.rb | 268 | 180 | 42.91%
|
17.78%
|
Code reported as executed by Ruby looks like this...and this: this line is also marked as covered.Lines considered as run by rcov, but not reported by Ruby, look like this,and this: these lines were inferred by rcov (using simple heuristics).Finally, here's a line marked as not executed.
1 require 'time' |
2 |
3 module ASIR |
4 # !SLIDE |
5 # Transport |
6 # |
7 # Client: Send the Request to the Service. |
8 # Service: Receive the Request from the Client. |
9 # Service: Invoke the Request. |
10 # Service: Send the Response to the Client. |
11 # Client: Receive the Response from the Service. |
12 class Transport |
13 include Log, Initialization, AdditionalData |
14 |
15 attr_accessor :encoder, :decoder, :one_way |
16 |
17 # Incremented for each request sent or received. |
18 attr_accessor :request_count |
19 |
20 # A Proc to call within #receive_request, after #_receive_request. |
21 # trans.after_receiver_request(trans, request) |
22 attr_accessor :after_receive_request |
23 |
24 # A Proc to call within #send_request, before #_send_request. |
25 # trans.before_send_request(trans, request) |
26 attr_accessor :before_send_request |
27 |
28 # Proc to call after #_send_response if response.exception. |
29 # trans.on_response_exception.call(trans, response) |
30 attr_accessor :on_response_exception |
31 |
32 # Proc to call with exception, if exception occurs within #serve_request!, but outside |
33 # Request#invoke!. |
34 # |
35 # trans.on_exception.call(trans, exception, :request, Request_instance) |
36 # trans.on_exception.call(trans, exception, :response, Response_instance) |
37 attr_accessor :on_exception |
38 |
39 attr_accessor :needs_request_identifier, :needs_request_timestamp |
40 alias :needs_request_identifier? :needs_request_identifier |
41 alias :needs_request_timestamp? :needs_request_timestamp |
42 |
43 attr_accessor :verbose |
44 |
45 def initialize *args |
46 @verbose = 0 |
47 super |
48 end |
49 |
50 # !SLIDE |
51 # Transport#send_request |
52 # * Encode Request. |
53 # * Send encoded Request. |
54 # * Receive decoded Response. |
55 def send_request request |
56 @request_count ||= 0; @request_count += 1 |
57 request.create_timestamp! if needs_request_timestamp? |
58 request.create_identifier! if needs_request_identifier? |
59 @before_send_request.call(self, request) if @before_send_request |
60 relative_request_delay! request |
61 request_payload = encoder.dup.encode(request) |
62 opaque_response = _send_request(request, request_payload) |
63 receive_response(request, opaque_response) |
64 end |
65 |
66 # !SLIDE |
67 # Transport#receive_request |
68 # Receive Request payload from stream. |
69 def receive_request stream |
70 @request_count ||= 0; @request_count += 1 |
71 additional_data = { } |
72 if req_and_state = _receive_request(stream, additional_data) |
73 # $stderr.puts "req_and_state = #{req_and_state.inspect}" |
74 request = req_and_state[0] = encoder.dup.decode(req_and_state.first) |
75 # $stderr.puts "req_and_state AFTER DECODE = #{req_and_state.inspect}" |
76 request.additional_data!.update(additional_data) if request |
77 if @after_receive_request |
78 begin |
79 @after_receive_request.call(self, request) |
80 rescue ::Exception => exc |
81 _log { [ :receive_request, :after_receive_request, :exception, exc ] } |
82 end |
83 end |
84 end |
85 req_and_state |
86 end |
87 # !SLIDE END |
88 |
89 # !SLIDE |
90 # Transport#send_response |
91 # Send Response to stream. |
92 def send_response response, stream, request_state |
93 request = response.request |
94 if @on_response_exception && response.exception |
95 begin |
96 @on_response_exception.call(self, response) |
97 rescue ::Exception => exc |
98 _log { [ :send_response, :response, response, :on_response_exception, exc ] } |
99 end |
100 end |
101 if @one_way && request.block |
102 request.block.call(response) |
103 else |
104 response.request = nil # avoid sending back entire Request. |
105 response_payload = decoder.dup.encode(response) |
106 _send_response(request, response, response_payload, stream, request_state) |
107 end |
108 end |
109 # !SLIDE END |
110 |
111 # !SLIDE |
112 # Transport#receive_response |
113 # Receieve Response from stream: |
114 # * Receive Response payload |
115 # * Decode Response. |
116 # * Extract Response result or exception. |
117 def receive_response request, opaque_response |
118 response_payload = _receive_response(request, opaque_response) |
119 response = decoder.dup.decode(response_payload) |
120 if response |
121 if exc = response.exception |
122 exc.invoke! |
123 else |
124 if ! @one_way && request.block |
125 request.block.call(response) |
126 end |
127 response.result |
128 end |
129 end |
130 end |
131 # !SLIDE END |
132 |
133 def _subclass_responsibility *args |
134 raise Error::SubclassResponsibility "subclass responsibility" |
135 end |
136 alias :_send_request :_subclass_responsibility |
137 alias :_receive_request :_subclass_responsibility |
138 alias :_send_response :_subclass_responsibility |
139 alias :_receive_response :_subclass_responsibility |
140 |
141 # !SLIDE |
142 # Serve a Request. |
143 def serve_request! in_stream, out_stream |
144 request = request_state = request_ok = response = response_ok = nil |
145 exception = original_exception = unforwardable_exception = nil |
146 request, request_state = receive_request(in_stream) |
147 if request |
148 request_ok = true |
149 response = invoke_request!(request) |
150 response_ok = true |
151 self |
152 else |
153 nil |
154 end |
155 rescue ::Exception => exc |
156 exception = original_exception = exc |
157 _log [ :request_error, exc ] |
158 @on_exception.call(self, exc, :request, request) if @on_exception |
159 ensure |
160 begin |
161 if request_ok |
162 if exception && ! response_ok |
163 case exception |
164 when *Error::Unforwardable.unforwardable |
165 unforwardable_exception = exception = Error::Unforwardable.new(exception) |
166 end |
167 response = Response.new(request, nil, exception) |
168 end |
169 if out_stream |
170 send_response(response, out_stream, request_state) |
171 end |
172 end |
173 rescue ::Exception => exc |
174 _log [ :response_error, exc ] |
175 @on_exception.call(self, exc, :response, response) if @on_exception |
176 end |
177 raise original_exception if unforwardable_exception |
178 end |
179 |
180 # !SLIDE pause |
181 |
182 # !SLIDE |
183 # Transport Server Support |
184 |
185 def with_server_signals! |
186 old_trap = { } |
187 [ "TERM", "HUP" ].each do | sig | |
188 trap = proc do | *args | |
189 @running = false |
190 unless @processing_request |
191 raise ::ASIR::Error::Terminate, "#{self} by SIG#{sig} #{args.inspect} in #{__FILE__}:#{__LINE__}" |
192 end |
193 end |
194 old_trap[sig] = Signal.trap(sig, trap) |
195 end |
196 yield |
197 ensure |
198 # $stderr.puts "old_trap = #{old_trap.inspect}" |
199 old_trap.each do | sig, trap | |
200 Signal.trap(sig, trap) rescue nil |
201 end |
202 end |
203 |
204 # !SLIDE |
205 # Transport Support |
206 # ... |
207 |
208 def encoder |
209 @encoder ||= |
210 Coder::Identity.new |
211 end |
212 |
213 def decoder |
214 @decoder ||= |
215 encoder |
216 end |
217 |
218 # Invokes the Request object, returns a Response object. |
219 def invoke_request! request |
220 _processing_request = @processing_request |
221 @processing_request = true |
222 wait_for_delay! request |
223 request.invoke! |
224 ensure |
225 @processing_request = _processing_request |
226 end |
227 |
228 # Returns the number of seconds from now, that the request should be delayed. |
229 # If request.delay is Numeric, sets request.delay to the Time to delay til. |
230 # If request.delay is Time, returns (now - request.delay).to_f |
231 # Returns Float if request.delay was set, or nil. |
232 # Returns 0 if delay has already expired. |
233 def relative_request_delay! request, now = nil |
234 case delay = request.delay |
235 when nil |
236 when Numeric |
237 now ||= Time.now |
238 delay = delay.to_f |
239 request.delay = (now + delay).utc |
240 when Time |
241 now ||= Time.now |
242 delay = (delay - now).to_f |
243 delay = 0 if delay < 0 |
244 else |
245 raise TypeError, "Expected request.delay to be Numeric or Time, given #{delay.class}" |
246 end |
247 delay |
248 end |
249 |
250 def wait_for_delay! request |
251 while (delay = relative_request_delay!(request)) && delay > 0 |
252 sleep delay |
253 end |
254 self |
255 end |
256 |
257 # !SLIDE END |
258 # !SLIDE resume |
259 |
260 def stop! force = false |
261 @running = false |
262 raise Error::Terminate if force |
263 self |
264 end |
265 |
266 end |
267 # !SLIDE END |
268 end |
Generated on Fri Jan 27 17:37:46 -0600 2012 with rcov 0.9.8