Stuff Gets Complicated

Systems become:

  • bigger →
  • complex →
  • slower →
  • distributed →
  • hard to test

Back when things were simple…

class CustomersController < ApplicationController
  def send_invoice
    @customer = Customer.find(params[:id])
    Email.send_email(:pdf_invoice,
                     :to => @customer.email,
                     :customer => @customer)
  end
end

Trying to improve user’s experience…

class CustomersController < ApplicationController
  def send_invoice
    @customer = Customer.find(params[:id])
    Process.fork do
      Email.send_email(:pdf_invoice,
                       :to = @customer.email,
                       :customer => @customer)
    end
  end
end

Use other machines to poll a work table…

class CustomersController < ApplicationController
  def send_invoice
    @customer = Customer.find(params[:id])
    EmailWork.create(:template_name => :pdf_invoice,
                     :options => {
                       :to => @customer.email,
                       :customer => @customer,
                     })
  end
end

Use queue infrastructure

class CustomersController < ApplicationController
  def send_invoice
    @customer = Customer.find(params[:id])
    queue_service.put(:queue => :Email,
                      :action => :send_email,
                      :template_name => :pdf_invoice,
                      :options => {
                        :to => @customer.email,
                        :customer => @customer,
                      })
  end
end

Issues

  • Problem Domain, Solution Domain
  • Service Middleware Semantics
  • Testing, Debugging, Diagnostics
  • Productivity

Problem Domain, Solution Domain

  • Client knows too much about infrastructure.
  • Evaluating and switching infrastructures.

Service Middleware Semantics

  • Directionality: One-way, Two-way
  • Synchronicity: Synchronous, Asynchronous, Delayed, Buffered
  • Distribution: Local Thread, Local Process, Distributed
  • Transport: File, IPC, Pipe, Network, Beanstalk, ZMQ, Resque
  • Robustness: Retry, Replay, Fallback
  • Encoding: XML, JSON, YAML, Base64, Compression

Testing, Debugging, Diagnostics

  • Configuration for testing and QA is more complex.
  • Measuring test coverage of remote services.
  • Debugging the root cause of remote service errors.
  • Diagnostic hooks.

Objectives

  • Simplify service/client definitions and interfaces.
  • Anticipate new encoding, delivery and security requirements.
  • Separate encoding and transport concerns.
  • Composition over Configuration.
  • Elide deployment decisions.
  • Integrate diagnostics and logging.
  • Simplify testing.

Foundations of Objects

  • Message
  • State
  • Behavior

Messaging

  • “Send a Message”, “Call a Function” are all the same, in all languages.
    • Decomposed into lookup() and apply().
  • “Send a Message”, not “Call a Method”.
  • Messaging abstracts:
    • Behavior from Implementation.
    • Transfer of control (method, function invocation, RPC, etc).

REST

Roy Fielding – Architectural Styles and the Design of Network-based Software Architectures

  • Imperative Action .vs. Behavorial Resource
  • REST Connector .vs. REST Component
  • “Generality of connectors leads to middleware…”
  • “Modifiability is about the ease with which a change can be made to an application architecture… broken down into evolvability, extensibility, customizability, configurability, and reusability…”

Design

  • Nouns → Objects → Classes
  • Verbs → Responsibilities → Methods

Book: “Designing Object-Oriented Software”

  • Wirfs-Brock, Wilkerson, Wiener

Design: Nouns

  • Service → Object
  • Client → Just a Ruby message sender.
  • Proxy
  • Message → Just a Ruby message.
  • Result, Exception (two-way) → Return value or else.
  • Transport → (file, pipe, http, queue, socket, ZMQ).
  • Encoder, Decoder → Coder (Marshal, XML, JSON, zlib).

Design: Verbs

  • Intercept Message → Proxy
  • Invoke Message → Message
  • Return Result, Invoke Exception → Result
  • Send Message, Recieve Message → Transport
  • Encode Object, Decode Object → Coder

Abstraction Leads to Rich Features

  • One-way and two-way requests as Module or instance methods.
  • Message support:
    • Delayed Messages.
  • Transports:
    • Null, Local, File, Named Pipe, TCP.
    • HTTP under WEBrick or as Rack application.
    • Beanstalkd, ZeroMQ, Resque.
    • Buffered, Broadcast, Fallback, Demux transports.
    • Time-decaying retry logic.
  • Encodings:
    • Marshal, XML, JSON, YAML, Base64, ZLib.
    • Chained encodings.
    • Signed payloads.

Simple

Client-Side Message

Server-Side

Client-Side Result

Sample Service

module Email
  def send_email template_name, options
    $stderr.puts "*** #{$$}: Email.send_mail #{template_name.inspect} #{options.inspect}"
    :ok
  end
  def do_raise msg
    raise msg
  end
  extend self
end

Using a Client Proxy

Email.send_email(:pdf_invoice,
                 :to => "user@email.com",
                 :customer => @customer)
# ->
Email.asir.
      send_email(:pdf_invoice,
                 :to => "user@email.com",
                 :customer => @customer)

Example Message

Email.asir.send_email(:pdf_invoice,
                        :to => "user@email.com",
                        :customer => @customer)
# ->
message = Message.new(...)
message.receiver_class == ::Module
message.receiver == ::Email
message.selector == :send_email
message.arguments == [ :pdf_invoice,
                       { :to => "user@email.com",
                         :customer => ... } ]

Client support for any Module

Extend Module with #client proxy support.

  module Client
    # Client Mixin
    def self.included target; ...; end 
    # Client Proxy
    class Proxy; ...; end 
  end

Client Proxy

Provide client interface proxy to a service.

  module Client
    class Proxy
      attr_accessor :receiver, :receiver_class

      # Accept messages as a proxy for the receiver.
      # Blocks are used represent a "continuation" for the Result.
      def send selector, *arguments, &block
        message = Message.new(@receiver, selector, arguments, block, self)
        message = @before_send_message.call(message) if @before_send_message
        @__configure.call(message, self) if @__configure
        result = transport.send_message(message)
        result
      end
      # Accept all other messages to be encoded and transported to a service.
      alias :method_missing :send

      # Client Transport
      def transport; ...; end 
      # Proxy Configuration
      def _configure &blk; ...; end 
      # Configuration Callbacks
      def initialize rcvr, rcvr_class; ...; end 
    end
  end

Message

Encapsulate the Ruby message from the Client to be handled by the Service.

  class Message
    include AdditionalData, Identity, CodeMore
    attr_accessor :receiver, :receiver_class, :selector, :arguments, :block
    attr_accessor :result, :one_way

    def initialize r, s, a, b, p
      @receiver, @selector, @arguments = r, s, a
      @block = b if b
      @receiver_class = @receiver.class
      @one_way = p._one_way if p
    end

    def invoke!
      @result = Result.new(self, @receiver.__send__(@selector, *@arguments))
    rescue *Error::Unforwardable.unforwardable => exc
      @result = Result.new(self, nil, Error::Unforwardable.new(exc))
    rescue ::Exception => exc
      @result = Result.new(self, nil, exc)
    end

    # Optional: Specifies the Numeric seconds or absolute Time for the Transport to delay the Message until actual invocation.
    attr_accessor :delay
  end

Result

Encapsulate the result returned to the Client.

  class Result
    include AdditionalData, Identity, CodeMore::Result
    attr_accessor :message, :result, :exception
    # Optional: Opaque data about the server that processed the Message.
    attr_accessor :server

    def initialize msg, res = nil, exc = nil
      @message = msg; @result = res
      @exception = exc && EncapsulatedException.new(exc)
      @identifier = @message.identifier
    end
  end

Coder

Define encoding and decoding for Messages and Results along a Transport.

  class Coder
    include Log, Initialization

    def encode obj
      _encode obj
    end

    def decode obj
      obj and _decode obj
    end

    # Coder subclasses:
    def _subclass_responsibility *args
      raise "subclass responsibility"
    end
    alias :_encode :_subclass_responsibility
    alias :_decode :_subclass_responsibility
  end

Identity Coder

Perform no encode/decode.

    class Identity < self
      def _encode obj
        obj
      end

      def _decode obj
        obj
      end

      # Completely stateless.
      def dup; self; end
    end

Transport

Client: Send the Message to the Service.
Service: Receive the Message from the Client.
Service: Invoke the Message.
Service: Send the Result to the Client.
Client: Receive the Result from the Service.

  class Transport
    include Log, Initialization, AdditionalData, Message::Delay, ThreadVariable, Conduit

    attr_accessor :encoder, :decoder, :one_way

    # Transport#send_message
    def send_message message; ...; end 
    # Transport#receive_message
    def receive_message stream; ...; end 

    # Transport#send_result
    def send_result result, stream, message_state; ...; end 

    # Transport#receive_result
    def receive_result message, opaque_result; ...; end 

    def initialize *args
      @verbose = 0
      super
    end

    # Incremented for each message sent or received.
    attr_accessor :message_count

    # A Proc to call within #receive_message, after #_receive_message.
    # trans.after_receive_message(trans, message)
    attr_accessor :after_receive_message

    # A Proc to call within #send_message, before #_send_message.
    # trans.before_send_message(trans, message)
    attr_accessor :before_send_message

    # Proc to call with #invoke_message! if result.exception.
    # trans.on_result_exception.call(trans, result)
    attr_accessor :on_result_exception

    # Proc to call with exception, if exception occurs within #serve_message!, but outside
    # Message#invoke!.
    #
    # trans.on_exception.call(trans, exception, :message, Message_instance, nil)
    # trans.on_exception.call(trans, exception, :result, Message_instance, Result_instance)
    attr_accessor :on_exception

    attr_accessor :needs_message_identifier, :needs_message_timestamp
    def needs_message_identifier? m; @needs_message_identifier; end
    def needs_message_timestamp?  m; @needs_message_timestamp; end

    attr_accessor :verbose

    def _subclass_responsibility *args
      raise Error::SubclassResponsibility "subclass responsibility"
    end
    alias :_send_message :_subclass_responsibility
    alias :_receive_message :_subclass_responsibility
    alias :_send_result :_subclass_responsibility
    alias :_receive_result :_subclass_responsibility

    # Serve a Message.
    def serve_message! in_stream, out_stream; ...; end 
    # Transport Server Support
    def stop! force = false; ...; end 
    # Transport Support
    def encoder; ...; end 

  end

Client Transport

  module Client
    class Proxy
      attr_accessor :transport

      def transport
        @transport ||= Transport::Local.new
      end
    end
  end

Call service directly

require 'example_helper' # !COMMENT
pr Email.send_email(:pdf_invoice,
                    :to => "user@email.com",
                    :customer => @customer)

Call service directly – Output

*** 10517: client process
*** 10517: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10517: pr: :ok

          
        

In-core, in-process service

require 'example_helper'
pr Email.asir.send_email(:pdf_invoice,
                           :to => "user@email.com",
                           :customer => @customer)

Sample Service with Client Support

require 'asir'
# Added .asir support.
module Email
  include ASIR::Client # Email.asir
  def send_email template_name, options
    $stderr.puts "*** #{$$}: Email.send_mail #{template_name.inspect} #{options.inspect}"
    :ok
  end
  def do_raise msg
    raise msg
  end
  extend self
end

In-core, in-process service – Output

*** 10520: client process
*** 10520: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10520: pr: :ok

          
        

One-way, asynchronous subprocess service

require 'example_helper'
begin
  Email.asir.transport = t =
    ASIR::Transport::Subprocess.new

  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com",
                             :customer => @customer)
end

Subprocess Transport

Send one-way Message to a forked subprocess.

    class Subprocess < Local
      def initialize *args
        @one_way = true; super
      end

      def _send_message message, message_payload
        Process.fork do
          send_result(super, nil, nil)
        end
      end

      # one-way; no Result
      def _receive_result message, opaque_result
      end

      # one-way; no Result
      def _send_result message, result, result_payload, stream, message_state
      end
    end

One-way, asynchronous subprocess service – Output

*** 10523: client process
*** 10523: pr: nil
*** 10525: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}

          
        

Subprocess service with continuation

require 'example_helper'
begin
  Email.asir.transport = t =
    ASIR::Transport::Subprocess.new(:one_way => true)
  pr(Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com",
                             :customer => @customer) { | resp |
     pr [ :in_block, resp.result ] })
end

One-way, file log service

require 'example_helper'
begin
  File.unlink(service_log = "#{__FILE__}.service.log") rescue nil
  Email.asir.transport = t =
    ASIR::Transport::File.new(:file => service_log)
  t.encoder =
    ASIR::Coder::Yaml.new
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com",
                             :customer => @customer)
ensure
  t.close
  puts "\x1a\n#{service_log.inspect} contents:"
  puts File.read(service_log)
end

One-way, file log service – Output

*** 10527: client process
*** 10527: pr: nil

MORE

One-way, file log service – Output – Page 2

".riterate/ex04.rb-1-capture.rb.service.log" contents:
# asir_payload_size: 169
--- !ruby/object:ASIR::Message 
arguments: 
- :pdf_invoice
- :to: user@email.com
  :customer: 123
one_way: 
receiver: Email
receiver_class: Module
selector: :send_email

# asir_payload_end

Replay file log

require 'example_helper'
begin
  service_log = "#{__FILE__.sub('ex05', 'ex04')}.service.log"
  Email.asir.transport = t =
    ASIR::Transport::File.new(:file => service_log)
  t.encoder =
    ASIR::Coder::Yaml.new
  t.serve_file!
end

Replay file log – Output

*** 10530: client process
*** 10530: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}

          
        

One-way, named pipe service

$stderr.puts "  #{$$} at #{__FILE__}:#{__LINE__}"

require 'example_helper'
begin
  File.unlink(service_pipe = "service.pipe") rescue nil
  Email.asir.transport = t =
    ASIR::Transport::File.new(:file => service_pipe)
  t.encoder =
    ASIR::Coder::Yaml.new
  server_process do
    t.prepare_server!
    t.run_server!
  end
  pr Email.asir.send_email(:pdf_invoice, :to => "user@email.com", :customer => @customer)
ensure
  t.close; sleep 1; server_kill
end

Named Pipe Server

    class File < Stream
      def prepare_server!
        # _log [ :prepare_pipe_server!, file ]
        unless ::File.exist? file
          system(cmd = "mkfifo #{file.inspect}") or raise "cannot run #{cmd.inspect}"
          ::File.chmod(perms, file) rescue nil if perms
        end
      end
      alias :prepare_pipe_server! :prepare_server!

      def run_server!
        # _log [ :run_pipe_server!, file ]
        with_server_signals! do
          @running = true
          while @running
            serve_file!
          end
        end
      end
      alias :run_pipe_server! :run_server!
    end

One-way, named pipe service – Output

  10533 at .riterate/ex06.rb-1-capture.rb:14


*** 10533: client process
*** 10535: server process
*** 10533: pr: nil
*** 10535: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}

          
        

Chain Coder

Chain multiple Coders as one.

message  --> | e1 | --> | e2 | --> | eN | -->
result   <-- | d1 | <-- | d2 | <-- | dN | <--
    class Chain < self
      attr_accessor :encoders

      def _encode obj
        encoders.each do | e |
          obj = e.dup.encode(obj)
        end
        obj
      end

      def _decode obj
        encoders.reverse_each do | e |
          obj = e.dup.decode(obj)
        end
        obj
      end
    end
  end
end

require 'asir/coder'

module ASIR
  class Coder

One-way, named pipe service with signature

require 'example_helper'
begin
  File.unlink(service_pipe = "service.pipe") rescue nil
  Email.asir.transport = t =
    ASIR::Transport::File.new(:file => service_pipe)
  t.encoder =
    ASIR::Coder::Chain.new(:encoders =>
      [ ASIR::Coder::Marshal.new,
        s = ASIR::Coder::Sign.new(:secret => 'abc123'),
        ASIR::Coder::Yaml.new,
      ])
  t.prepare_pipe_server!
  server_process do
    t.run_pipe_server!
  end
  pr Email.asir.send_email(:pdf_invoice, :to => "user@email.com", :customer => @customer)
ensure
  t.close; sleep 1; server_kill
end

One-way, named pipe service with signature – Output

*** 10538: client process
*** 10541: server process
*** 10538: pr: nil
*** 10541: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}

          
        

One-way, named pipe service with invalid signature

require 'example_helper'
begin
  File.unlink(service_pipe = "service.pipe") rescue nil
  Email.asir.transport = t =
    ASIR::Transport::File.new(:file => service_pipe)
  t.encoder =
    ASIR::Coder::Chain.new(:encoders =>
      [ ASIR::Coder::Marshal.new,
        s = ASIR::Coder::Sign.new(:secret => 'abc123'),
        ASIR::Coder::Yaml.new,
      ])
  t.prepare_pipe_server!
  server_process do
    t.run_pipe_server!
  end
  s.secret = 'I do not know the secret! :('
  pr Email.asir.send_email(:pdf_invoice, :to => "user@email.com", :customer => @customer)
ensure
  t.close; sleep 1; server_kill
end

One-way, named pipe service with invalid signature – Output

*** 10543: client process
*** 10546: server process
*** 10543: pr: nil
  10546 ASIR::Transport::File :message_error, #<ASIR::Coder::Sign::SignatureErro\
r: signature invalid>

Socket service

require 'example_helper'
begin
  Email.asir.transport = t =
    ASIR::Transport::TcpSocket.new(:port => 30909)
  t.encoder =
    ASIR::Coder::Marshal.new
  t.prepare_server!
  server_process do
    t.run_server!
  end
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com", :customer => @customer)
ensure
  t.close; sleep 1; server_kill
end

Socket service – Output

*** 10548: client process
*** 10550: server process
*** 10550: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10548: pr: :ok

          
        

Socket service with forwarded exception.

require 'example_helper'
begin
  Email.asir.transport = t =
    ASIR::Transport::TcpSocket.new(:port => 30910)
  t.encoder =
    ASIR::Coder::Marshal.new
  t.prepare_server!
  server_process do
    t.run_server!
  end
  pr Email.asir.do_raise("Raise Me!")
rescue Exception => err
  pr [ :exception, err ]
ensure
  t.close; sleep 1; server_kill
end

Socket service with forwarded exception. – Output

*** 10553: client process
*** 10555: server process
*** 10553: pr: [:exception, #<RuntimeError: Raise Me!>]

          
        

Example Exception

Email.do_raise("DOH!")
#
# ->
result.exception = ee = EncapsulatedException.new(...)
ee.exception_class = "::RuntimeError"
ee.exception_message = "DOH!"
ee.exception_backtrace = [ ... ]

Encapsulated Exception

Encapsulates exceptions raised in the Service.

  class EncapsulatedException
    include ObjectResolving, AdditionalData
    attr_accessor :exception_class, :exception_message, :exception_backtrace

    def initialize exc
      @exception_class     = exc.class.name
      @exception_message   = exc.message
      @exception_backtrace = exc.backtrace
    end

    def invoke!
      raise resolve_object(@exception_class), @exception_message, @exception_backtrace
    end
  end

Socket service with local fallback.

require 'example_helper'
begin
  File.unlink(service_log = "#{__FILE__}.service.log") rescue nil
  Email.asir.transport = t =
    ASIR::Transport::Fallback.new(:transports => [
      tcp = ASIR::Transport::TcpSocket.new(:port => 31911,
                                           :encoder => ASIR::Coder::Marshal.new),
      ASIR::Transport::Broadcast.new(:transports => [
        file = ASIR::Transport::File.new(:file => service_log,
                                         :encoder => ASIR::Coder::Yaml.new),
        ASIR::Transport::Subprocess.new,
      ]),
    ])
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com", :customer => @customer)
  server_process do
    tcp.prepare_server!
    tcp.run_server!
  end
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user2@email.com", :customer => @customer)
ensure
  file.close rescue nil;
  tcp.close rescue nil; sleep 1; server_kill
  puts "\x1a\n#{service_log.inspect} contents:"
  puts File.read(service_log)
end

Socket service with local fallback. – Output

*** 10557: client process
RETRY: 1: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 2: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 3: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 4: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 5: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\

MORE

Socket service with local fallback. – Output – Page 2

ion refused - connect(2)>>
RETRY: 6: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 7: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 8: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 9: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
FAILED: 10: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect t\
o ASIR::Transport::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Conne\

MORE

Socket service with local fallback. – Output – Page 3

ction refused - connect(2)>>
  10557 ASIR::Transport::Fallback :send_message, :transport_failed, #<Errno::ECO\
NNREFUSED: Connection refused - Cannot connect to ASIR::Transport::TcpSocket tcp\
://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connection refused - connect(2)>>
*** 10557: pr: nil
*** 10560: server process
*** 10559: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10560: Email.send_mail :pdf_invoice {:to=>"user2@email.com", :customer=>123}
*** 10557: pr: :ok

MORE

Socket service with local fallback. – Output – Page 4

".riterate/ex11.rb-1-capture.rb.service.log" contents:
# asir_payload_size: 388
--- !ruby/object:ASIR::Message 
additional_data: 
  :transport_exceptions: 
  - "#<Errno::ECONNREFUSED: Connection refused - Cannot connect to ASIR::Transpo\
rt::TcpSocket tcp://127.0.0.1:31911: #<Errno::ECONNREFUSED: Connection refused -\
 connect(2)>>"
arguments: 
- :pdf_invoice
- :to: user@email.com
  :customer: 123
one_way: 
receiver: Email
receiver_class: Module

MORE

Socket service with local fallback. – Output – Page 5

selector: :send_email

# asir_payload_end

Implementation

  • Primary Base classes: Transport, Coder.
  • Primary API: Proxy via Client mixin.
  • Handful of mixins.

Transport#send_message

  • Encode Message.
  • Send encoded Message.
  • Receive decoded Result.
  class Transport
    def send_message message
      @message_count ||= 0; @message_count += 1
      message.create_timestamp! if needs_message_timestamp? message
      message.create_identifier! if needs_message_identifier? message
      @before_send_message.call(self, message) if @before_send_message
      relative_message_delay! message
      message_payload = encoder.dup.encode(message)
      opaque_result = _send_message(message, message_payload)
      receive_result(message, opaque_result)
    end
  end

Local Transport

Send Message to same process.
Requires Identity Coder.

    class Local < self
      # Returns Result object after invoking Message.
      def _send_message message, message_payload
        invoke_message!(message)
      end

      # Returns Result object from #send_message.
      def _receive_result message, opaque_result
        opaque_result
      end
    end

Null Transport

Never send Message.

    class Null < self
      def _send_message message, message_payload
        nil
      end
    end

YAML Coder

Use YAML for encode/decode.

    class Yaml < self
      def _encode obj
        case obj
        when Message, Result
          obj = obj.encode_more!
        end
        ::YAML::dump(obj)
      rescue ::Exception
        require 'pp'
        msg = "#{self}: failed to encode: #{$!.inspect}:\n  #{PP.pp(obj, '')}"
        $stderr.puts msg
        raise Error, msg
      end

      def _decode obj
        case obj = ::YAML::load(obj)
        when Message, Result
          obj.decode_more!
        else
          obj
        end
      end
    end # class
  end # class
end # module

require 'asir'

require 'zlib'

module ASIR
  class Coder
    class Zlib < self
      attr_accessor :compression_level

      def _encode obj
        raise TypeError unless String === obj
        ::Zlib::Deflate.deflate(obj, @compression_level || ::Zlib::DEFAULT_COMPRESSION)
      end
      def _decode obj
        raise TypeError unless String === obj
        ::Zlib::Inflate.inflate(obj)
      end
    end
  end
end

module ASIR

File Transport

Send Message one-way to a file.
Can be used as a log or named pipe service.

    class File < Stream
      include PayloadIO # _write, _read
      attr_accessor :file, :mode, :perms, :stream

      def initialize opts = nil; @one_way = true; super; end

      # Writes a Message payload String.
      def _send_message message, message_payload
        _write message_payload, stream
      ensure
        close if ::File.pipe?(file)
      end

      # Returns a Message payload String.
      def _receive_message stream, additional_data
        [ _read(stream), nil ]
      end

      # one-way; no Result.
      def _send_result message, result, result_payload, stream, message_state
        nil
      end

      # one-way; no Result.
      def _receive_result message, opaque_result
        nil
      end

      # File Transport Support
      def stream; ...; end 
      # Process (receive) messages from a file.
      def serve_file!; ...; end 
      # Named Pipe Server
      def prepare_server!; ...; end 
    end

File Transport Support

    class File < Stream
      def stream
        @stream ||=
          begin
            stream = ::File.open(file, mode || "w+")
            ::File.chmod(perms, file) rescue nil if @perms
            after_connect!(stream) if respond_to?(:after_connect!)
            stream
          end
      end
    end

Payload IO for Transport

Framing

  • Header line containing the number of bytes in the payload.
  • The payload bytes.
  • Blank line.
  • Footer.
    module PayloadIO
      class UnexpectedResponse < Error; end

      HEADER = "# asir_payload_size: "
      FOOTER = "\n# asir_payload_end"

      def _write payload, stream
        stream.write HEADER
        stream.puts payload.size
        stream.write payload
        stream.puts FOOTER
        stream.flush
        stream
      end

      def _read stream
        size = /\d+$/.match(stream.readline.chomp)[0].to_i # HEADER (size)
        payload = stream.read(size)
        stream.readline # FOOTER
        stream.readline
        payload
      end

      def _read_line_and_expect! stream, regexp
        line = stream.readline
        unless match = regexp.match(line)
          _log { "_read_line_and_expect! #{stream} #{regexp.inspect} !~ #{line.inspect}" }
          raise UnexpectedResponse, "expected #{regexp.inspect}, received #{line.inspect}"
        end
        match
      end

    end
  end
end
require 'asir/transport/http'
require 'rack'

module ASIR
  class Transport

Transport#receive_message

Receive Message payload from stream.

  class Transport
    def receive_message stream
      @message_count ||= 0; @message_count += 1
      additional_data = { }
      if req_and_state = _receive_message(stream, additional_data)
        message = req_and_state[0] = encoder.dup.decode(req_and_state.first)
        message.additional_data!.update(additional_data) if message
        if @after_receive_message
          begin
            @after_receive_message.call(self, message)
          rescue ::Exception => exc
            _log { [ :receive_message, :after_receive_message, :exception, exc ] }
          end
        end
      end
      req_and_state
    end
  end

Process (receive) messages from a file.

    class File < Stream
      def serve_file!
        ::File.open(file, "r") do | stream |
          @running = true
          _serve_stream! stream, nil # One-way: no result stream.
        end
      end
    end

Stream Transport

Base class handles Messages on a stream.
Stream Transports require a Coder that encodes to and from String payloads.

    class Stream < self

      # Serve all Messages from a stream.
      def serve_stream! in_stream, out_stream; ...; end 
      # Serve a Message from a stream.
      def serve_stream_message! in_stream, out_stream; ...; end 

Serve all Messages from a stream.

    class Stream < self
      def serve_stream! in_stream, out_stream
        with_server_signals! do
          @running = true
          _serve_stream! in_stream, out_stream
        end
      end

      def _serve_stream! in_stream, out_stream
        while @running && ! stream_eof?(in_stream)
          begin
            serve_stream_message! in_stream, out_stream
          rescue Error::Terminate => err
            @running = false
            _log [ :serve_stream_terminate, err ]
          rescue ::Exception => err
            _log [ :serve_stream_error, err ]
          end
        end
      end

      # Subclasses can override this method.
      def stream_eof? stream
        stream.eof?
      end
    end

Serve a Message from a stream.

    class Stream < self
      def serve_stream_message! in_stream, out_stream
        serve_message! in_stream, out_stream
      end
    end
    end

Marshal Coder

Use Ruby Marshal for encode/decode.

    class Marshal < self
      def _encode obj
        ::Marshal.dump(obj)
      end

      def _decode obj
        ::Marshal.load(obj)
      end
    end # class

Sign Coder

Sign payload during encode, check signature during decode.

Signature is the digest of secret + payload.

Encode payload as Hash containing the digest function name, signature and payload.
Decode and validate Hash containing the digest function name, signature and payload.

    class Sign < self
      attr_accessor :secret, :function

      def _encode obj
        payload = obj.to_s
        { :function  => function,
          :signature => ::Digest.const_get(function).
                          new.hexdigest(secret + payload),
          :payload   => payload }
      end

      def _decode obj
        raise SignatureError, "expected Hash, given #{obj.class}" unless Hash === obj
        payload = obj[:payload]
        raise SignatureError, "signature invalid" unless obj == _encode(payload)
        payload
      end

      # Sign Coder Support
      class SignatureError < Error; end; ...; end 
    end 

TCP Socket Transport

    class TcpSocket < ConnectionOriented
      # TCP Socket Client
      def _client_connect!; ...; end 
      # TCP Socket Server
      def _server!; ...; end 

TCP Socket Server

    class TcpSocket < ConnectionOriented
      def _server!
        @server = TCPServer.open(port)
        @server.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, false)
      end

      def _server_accept_connection! server
        socket = server.accept
        [ socket, socket ] # Use same socket for in_stream and out_stream
      end

      def _server_close_connection! stream, out_stream
        stream.close rescue nil
      end
    end
    end

TCP Socket Client

    class TcpSocket < ConnectionOriented
      def _client_connect!
        sock = TCPSocket.open(address, port)
      end
    end

Transport#send_result

Send Result to stream.

  class Transport
    def send_result result, stream, message_state
      message = result.message
      if @one_way && message.block
        message.block.call(result)
      else
        result.message = nil # avoid sending back entire Message.
        result_payload = decoder.dup.encode(result)
        _send_result(message, result, result_payload, stream, message_state)
      end
    end
  end

Transport#receive_result

Receieve Result from stream:

  • Receive Result payload
  • Decode Result.
  • Extract Result result or exception.
  class Transport
    def receive_result message, opaque_result
      result_payload = _receive_result(message, opaque_result)
      result = decoder.dup.decode(result_payload)
      if result && ! message.one_way
        if exc = result.exception
          invoker.invoke!(exc, self)
        else
          if ! @one_way && message.block
            message.block.call(result)
          end
          result.result
        end
      end
    end
  end

Sends the encoded Message payload String.

    class Beanstalk < TcpSocket
      def _send_message message, message_payload
        stream.with_stream! do | s |
          begin
            match = 
              _beanstalk(s, 
                         "put #{message[:beanstalk_priority] || @priority} #{message[:beanstalk_delay] || @delay} #{message[:beanstalk_ttr] || @ttr} #{message_payload.size}\r\n",
                         /\AINSERTED (\d+)\r\n\Z/,
                         message_payload)
            job_id = message[:beanstalk_job_id] = match[1].to_i
            _log { "beanstalk_job_id = #{job_id.inspect}" } if @verbose >= 2
          rescue ::Exception => exc
            message[:beanstalk_error] = exc
            close
            raise exc
          end
        end
      end

      RESERVE = "reserve\r\n".freeze
    end

Receives the encoded Message payload String.

    class Beanstalk < TcpSocket
      def _receive_message channel, additional_data
        channel.with_stream! do | stream |
          begin
            match = 
              _beanstalk(stream,
                         RESERVE,
                         /\ARESERVED (\d+) (\d+)\r\n\Z/)
            additional_data[:beanstalk_job_id] = match[1].to_i
            additional_data[:beanstalk_message_size] = 
              size = match[2].to_i
            message_payload = stream.read(size)
            _read_line_and_expect! stream, /\A\r\n\Z/
            # Pass the original stream used to #_send_result below.
            [ message_payload, stream ]
          rescue ::Exception => exc
            _log { [ :_receive_message, :exception, exc ] }
            additional_data[:beanstalk_error] = exc
            channel.close
          end
        end
      end
    end

Sends the encoded Result payload String.

    class Beanstalk < TcpSocket
      def _send_result message, result, result_payload, channel, stream
        #
        # There is a possibility here the following could happen:
        #
        #   _receive_message
        #     channel == #<Channel:1>   
        #     channel.stream == #<TCPSocket:1234>
        #   end
        #   ...
        #   ERROR OCCURES:
        #      channel.stream.close
        #      channel.stream = nil
        #   ...
        #   _send_result 
        #     channel == #<Channel:1>
        #     channel.stream == #<TCPSocket:5678> # NEW CONNECTION
        #     stream.write "delete #{job_id}"
        #   ...
        #
        # Therefore: _receiver_message passes the original message stream to us.
        # We insure that the same stream is still the active one and use it.
        channel.with_stream! do | maybe_other_stream |
          _log [ :_send_result, "stream lost" ] if maybe_other_stream != stream
          job_id = message[:beanstalk_job_id] or raise "no beanstalk_job_id"
          _beanstalk(stream,
                     "delete #{job_id}\r\n",
                     /\ADELETED\r\n\Z/)
        end
      end
    end

Receives the encoded Result payload String.

    class Beanstalk < TcpSocket
      def _receive_result message, opaque_result
        nil
      end
    end

Fallback Transport

    class Fallback < self
      include Composite

      def _send_message message, message_payload
        result = sent = first_exception = nil
        transports.each do | transport |
          begin
            result = transport.send_message(message)
            sent = true
            break
          rescue ::Exception => exc
            first_exception ||= exc
            _handle_send_message_exception! transport, message, exc
          end
        end
        unless sent
          if first_exception && @reraise_first_exception
            $! = first_exception
            raise
          end
          raise FallbackError, "fallback failed"
        end
        result
      end
      class FallbackError < Error; end
    end

Buffer Transport

Buffers Messages until #flush!
Assumes One-way Messages.

    class Buffer < self
      include Delegation

      # Transport to send_message.
      attr_accessor :transport

      def initialize *args
        super
        @messages = Queue.new
        @messages_mutex = Mutex.new
        @paused = 0
        @paused_mutex = Mutex.new
      end

      # If paused, queue messages,
      # Otherwise delegate immediately to #transport.
      def _send_message message, message_payload
        return nil if @ignore
        if paused?
          @messages_mutex.synchronize do
            @messages << message
          end
          nil
        else
          @transport.send_message(message)
        end
      end

      # Returns true if currently paused.
      # Messages are queued until #resume!.
      def paused?
        @paused > 0
      end

      # Pauses all messages until resume!.
      # May be called multiple times.
      def pause!
        @paused_mutex.synchronize do
          @paused += 1
        end
        self
      end

      # Will automatically call #flush! when not #paused?.
      def resume!
        should_flush = @paused_mutex.synchronize do
          @paused -= 1 if @paused > 0
          @paused == 0
        end
        flush! if should_flush
        self
      end

      def size
        @messages_mutex.synchronize do
          @messages.size
        end
      end

      # Will flush pending Messages even if ! #paused?.
      def flush!
        clear!.each do | message |
          @transport.send_message(message)
        end
        self
      end

      # Clear all pending Messages without sending them.
      # Returns Array of Messages that would have been sent.
      def clear!
        messages = [ ]
        @messages_mutex.synchronize do
          @messages.size.times do
            messages << @messages.shift(true)
          end
        end
        messages
      end

      # Take Message from head of Queue.
      def shift non_block=false
        @messages.shift(non_block)
      end

      # Processes queue.
      # Usually used in worker Thread.
      def process! non_block=false
        @running = true
        while @running && message = shift(non_block)
          @transport.send_message(message)
        end
        message
      end

      # Stop processing queue.
      def stop!
        @messages_mutex.synchronize do
          @ignore = true; @running = false
        end
        self
      end
    end

Synopsis

  • Services are easy to abstract away.
  • Separation of transport, encoding.

Appendix

Modules and Classes

module ASIR
  # Reusable constants to avoid unnecessary garbage.
  EMPTY_ARRAY = [ ].freeze; EMPTY_HASH =  { }.freeze; EMPTY_STRING = ''.freeze
  MODULE_SEP = '::'.freeze; IDENTITY_LAMBDA = lambda { | x | x }
end

Object Initialization

Support initialization by Hash.

E.g.:

  Foo.new(:bar => 1, :baz => 2)

  obj = Foo.new; obj.bar = 1; obj.baz = 2; obj
  module Initialization
    def initialize opts = nil
      opts ||= EMPTY_HASH
      initialize_before_opts if respond_to? :initialize_before_opts
      opts.each do | k, v |
        send(:"#{k}=", v)
      end
      initialize_after_opts if respond_to? :initialize_after_opts
    end
  end
end

module ASIR

Diagnostic Logging

Logging mixin.

  module Log
    attr_accessor :_logger

    def self.included target
      super
      target.send(:extend, ClassMethods)
    end

    @@enabled = false
    def self.enabled
      @@enabled
    end
    def self.enabled= x
      @@enabled = x
    end

    module ClassMethods
      def _log_enabled= x
        (Thread.current[:'ASIR::Log.enabled'] ||= { })[self] = x
      end
      def _log_enabled?
        (Thread.current[:'ASIR::Log.enabled'] ||= { })[self]
      end
    end

    def _log_enabled= x
      @_log_enabled = x
    end

    def _log_enabled?
      ASIR::Log.enabled || 
        @_log_enabled || 
        self.class._log_enabled?
    end

    def _log msg = nil
      return unless _log_enabled?
      msg ||= yield if block_given?
      msg = String === msg ? msg : _log_format(msg)
      msg = "  #{$$} #{Module === self ? self : self.class} #{msg}"
      case @_logger
      when Proc
        @_logger.call msg
      when IO
        @_logger.puts msg
      else
        $stderr.puts msg
      end
      nil
    end

    def _log_result msg
      _log { 
        msg = String === msg ? msg : _log_format(msg);
        "#{msg} => ..." }
      result = yield
      _log { "#{msg} => \n    #{result.inspect}" }
      result
    end

    def _log_format obj
      case obj
      when Exception
        msg = "#{obj.inspect}"
        msg << "\n    #{obj.backtrace * "\n    "}" if false
        msg
      when Array
        obj.map { | x | _log_format x } * ", "
      else
        obj.inspect
      end
    end
  end # module
end # module

require 'asir'
require 'asir/environment'
require 'time'

module ASIR
class Main
  attr_accessor :env, :args, :exit_code
  # Delegate getter/setters to @env.
  [ :verb, :adjective, :object, :identifier,
    :config_rb,
    :verbose,
    :log_dir, :log_file,
    :pid_dir, :pid_file,
  ].
    map{|g| [ g, :"#{g}=" ]}.
    flatten.each do | m |
      define_method(m) { | *args | @env.send(m, *args) }
    end
  attr_accessor :progname
  # Options:
  attr_accessor :force

  # Transport selected from asir.phase = :transport.
  attr_accessor :transport

  def initialize
    self.env = ASIR::Environment.new
    self.progname = File.basename($0)
    self.exit_code = 0
  end

  def parse_args! args = ARGV.dup
    self.args = args
    until args.empty?
      case args.first
      when /^([a-z0-9_]+=)(.*)/i
        k, v = $1.to_sym, $2
        args.shift
        v = v.to_i if v == v.to_i.to_s
        send(k, v)
      else
        break
      end
    end
    self.verb, self.adjective, self.object, self.identifier = args.map{|x| x.to_sym}
    self.identifier ||= :'0'
    self
  end

  def config! *args
    @env.config! *args
  end

  def log_str
    "#{Time.now.gmtime.iso8601(4)} #{$$} #{log_str_no_time}"
  end

  def log_str_no_time
    "#{progname} #{verb} #{adjective} #{object} #{identifier}"
  end

  def run!
    unless verb && adjective && object
      self.exit_code = 1
      return usage!
    end
    config!(:configure)
    # $stderr.puts "log_file = #{log_file.inspect}"
    case self.verb
    when :restart
      self.verb = :stop
      _run_verb! && sleep(1)
      self.verb = :start
      _run_verb!
    else
      _run_verb!
    end
    self
  rescue ::Exception => exc
    $stderr.puts "#{log_str} ERROR\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}"
    self.exit_code += 1
    self
  end

  def _run_verb!
    sel = :"#{verb}_#{adjective}_#{object}!"
    if verbose >= 3
      $stderr.puts "  verb      = #{verb.inspect}"
      $stderr.puts "  adjective = #{adjective.inspect}"
      $stderr.puts "  object    = #{object.inspect}"
      $stderr.puts "  sel       = #{sel.inspect}"
    end
    send(sel)
  rescue ::Exception => exc
    $stderr.puts "#{log_str} ERROR\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}"
    self.exit_code += 1
    raise
    nil
  end

  def method_missing sel, *args
    log "method_missing #{sel}" if verbose >= 3
    case sel.to_s
    when /^start_([^_]+)_worker!$/
      _start_worker!
    when /^status_([^_]+)_([^_]+)!$/
      pid = server_pid
      puts "#{log_str} pid #{pid}"
      system("ps -fw -p #{pid}")
    when /^log_([^_]+)_([^_]+)!$/
      puts log_file
    when /^taillog_([^_]+)_([^_]+)!$/
      exec "tail -f #{log_file.inspect}"
    when /^pid_([^_]+)_([^_]+)!$/
      pid = server_pid rescue nil
      alive = process_running? pid
      puts "#{pid_file} #{pid || :NA} #{alive}"
    when /^alive_([^_]+)_([^_]+)!$/
      pid = server_pid rescue nil
      alive = process_running? pid
      puts "#{pid_file} #{pid || :NA} #{alive}" if @verbose
      self.exit_code += 1 unless alive
    when /^stop_([^_]+)_([^_]+)!$/
      kill_server!
    else
      super
    end
  end

  def usage!
    $stderr.puts <<"END"
SYNOPSIS:
  asir [ <<options>> ... ] <<verb>> <<adjective>> <<object>> [ <<identifier>> ]

OPTIONS:
  config_rb=file.rb ($ASIR_LOG_DIR)
  pid_dir=dir/      ($ASIR_PID_DIR)
  log_dir=dir/      ($ASIR_LOG_DIR)
  verbose=[0-9]

VERBS:
  start
  stop
  restart
  status
  log
  pid
  alive

ADJECTIVE-OBJECTs:
  beanstalk conduit
  beanstalk worker
  zmq worker
  webrick worker
  resque conduit
  resque worker

EXAMPLES:

  export ASIR_CONFIG_RB="some_system/asir_config.rb"
  asir start beanstalk conduit
  asir status beanstalk conduit

  asir start webrick worker
  asir pid   webrick worker

  asir start beanstalk worker 1
  asir start beanstalk worker 2

  asir start zmq worker
  asir start zmq worker 1
  asir start zmq worker 2
END
  end

  def start_beanstalk_conduit!
    _start_conduit!
  end

  def start_resque_conduit!
    _start_conduit!
  end

  def _start_conduit!
    config!(:environment)
    self.transport = config!(:transport)
    fork_server! do
      transport.start_conduit! :fork => false
    end
  end

  def _start_worker! type = adjective
    log "start_worker! #{type}"
    type = type.to_s
    fork_server! do
      transport_file = "asir/transport/#{type}"
      log "loading #{transport_file}"
      require transport_file
      _create_transport ASIR::Transport.const_get(type[0..0].upcase + type[1..-1])
      _run_workers!
    end
  end

  def fork_server! cmd = nil, &blk
    pid = Process.fork do
      run_server! cmd, &blk
    end
    log "forked pid #{pid}"
    Process.detach(pid) # Forks a Thread?  We are gonna exit anyway.
    File.open(pid_file, "w+") { | o | o.puts pid }
    File.chmod(0666, pid_file) rescue nil

    # Wait and check if process still exists.
    sleep 3
    unless process_running? pid
      raise "Server process #{pid} died to soon?"
    end

    self
  end

  def run_server! cmd = nil
    lf = File.open(log_file, "a+")
    File.chmod(0666, log_file) rescue nil
    $stdin.close rescue nil
    STDIN.close rescue nil
    STDOUT.reopen(lf)
    $stdout.reopen(lf) if $stdout.object_id != STDOUT.object_id
    STDERR.reopen(lf)
    $stderr.reopen(lf) if $stderr.object_id != STDERR.object_id
    # Process.daemon rescue nil # Ruby 1.9.x only.
    lf.puts "#{log_str} starting pid #{$$}"
    begin
      if cmd
        exec(cmd)
      else
        yield
      end
    ensure
      lf.puts "#{log_str} finished pid #{$$}"
      File.unlink(pid_file) rescue nil
    end
    self
  rescue ::Exception => exc
    msg = "ERROR pid #{$$}\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}"
    log msg, :stderr
    raise
    self
  end

  def kill_server!
    log "#{log_str} kill"
    pid = server_pid
    stop_pid! pid
  rescue ::Exception => exc
    log "#{log_str} ERROR\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}", :stderr
    raise
  end

  def log msg, to_stderr = false
    if to_stderr
      $stderr.puts "#{log_str_no_time} #{msg}"
    end
    File.open(log_file, "a+") do | log |
      log.puts "#{log_str} #{msg}"
    end
  end

  def server_pid
    pid = File.read(pid_file).chomp!
    pid.to_i
  end

  def _create_transport default_class
    config!(:environment)
    case transport = config!(:transport)
    when default_class
      self.transport = transport
    else
      raise "Expected config to return a #{default_class}, not a #{transport.class}"
    end
  end

  def worker_pids
    (@worker_pids ||= { })[adjective] ||= { }
  end

  def _run_workers!
    $0 = "#{progname} #{adjective} #{object} #{identifier}"

    worker_id = 0
    transport.prepare_server!
    worker_processes = transport[:worker_processes] || 1
    (worker_processes - 1).times do
      wid = worker_id += 1
      pid = Process.fork do
        _run_transport_server! wid
      end
      Process.setgprp(pid, 0) rescue nil
      worker_pids[wid] = pid
      log "forked #{wid} pid #{pid}"
    end

    _run_transport_server!
  ensure
    log "worker 0 stopped"
    _stop_workers!
  end

  def _run_transport_server! wid = 0
    log "running transport worker #{transport.class} #{wid}"
    config!(:start)
    $0 += " #{wid} #{transport.uri rescue nil}"
    old_arg0 = $0.dup
    after_receive_message = transport.after_receive_message || lambda { | transport, message | nil }
    transport.after_receive_message = lambda do | transport, message |
      $0 = "#{old_arg0} #{transport.message_count} #{message.identifier}"
      after_receive_message.call(transport, message)
    end
    transport.run_server!
    self
  end

  def _stop_workers!
    workers = worker_pids.dup
    worker_pids.clear
    workers.each do | wid, pid |
      config!(:stop)
      stop_pid! pid, "wid #{wid} "
    end
    workers.each do | wid, pid |
      wr = Process.waitpid(pid) rescue nil
      log "stopped #{wid} pid #{pid} => #{wr.inspect}", :stderr
    end
  ensure
    worker_pids.clear
  end

  def stop_pid! pid, msg = nil
    log "stopping #{msg}pid #{pid}", :stderr
    if process_running? pid
      log "TERM pid #{pid}"
      Process.kill('TERM', pid) rescue nil
      sleep 3
      if force or process_running? pid
        log "KILL pid #{pid}", :stderr
        Process.kill('KILL', pid) rescue nil
      end
      if process_running? pid
        log "cant-stop pid #{pid}", :stderr
      end
    else
      log "not-running? pid #{pid}", :stderr
    end
  end

  def process_running? pid
    case pid
    when false, nil
      pid
    when Integer
      Process.kill(0, pid)
    else
      raise TypeError, "expected false, nil, Integer; given #{pid.inspect}"
    end
    true
  rescue ::Errno::ESRCH
    false
  rescue ::Exception => exc
    $stderr.puts "  DEBUG: process_running? #{pid} => #{exc.inspect}"
    false
  end

end # class
end # module

module ASIR
  class Message
    module Delay
      # Returns the number of seconds from now, that the message should be delayed.
      # If message.delay is Numeric, sets message.delay to the Time to delay til.
      # If message.delay is Time, returns (now - message.delay).to_f
      # Returns Float if message.delay was set, or nil.
      # Returns 0 if delay has already expired.
      def relative_message_delay! message, now = nil
        case delay = message.delay
        when nil
        when Numeric
          now ||= Time.now
          delay = delay.to_f
          message.delay = (now + delay).utc
        when Time
          now ||= Time.now
          delay = (delay - now).to_f
          delay = 0 if delay < 0
        else
          raise TypeError, "Expected message.delay to be Numeric or Time, given #{delay.class}"
        end
        delay
      end

      def wait_for_delay! message
        while (delay = relative_message_delay!(message)) && delay > 0 
          sleep delay
        end
        self
      end

    end
  end
end

module ASIR

Null Coder

Always encode/decode as nil.

    class Null < self
      def _encode obj
        nil
      end

      def _decode obj
        nil
      end

      # Completely stateless.
      def dup; self; end
    end

Sign Coder Support

Signature Error.

    class Sign < self
      class SignatureError < Error; end

      def initialize_before_opts
        @function = :SHA1
      end
    end

Broadcast Transport

Broadcast to multiple Transports.

    class Broadcast < self
      include Composite

      def _send_message message, message_payload
        result = first_exception = nil
        transports.each do | transport |
          begin
            result = transport.send_message(message)
          rescue ::Exception => exc
            first_exception ||= exc
            _handle_send_message_exception! transport, message, exc
            raise unless @continue_on_exception
          end
        end
        if first_exception && @reraise_first_exception
          $! = first_exception
          raise
        end
        result
      end

    end

HTTP Transport

Using HTTPClient.

    class HTTP < self
      attr_accessor :uri, :server, :debug

      # Client-side: HTTPClient

      def client
        @client ||=
          Channel.new(:on_connect =>
            lambda { | channel | ::HTTPClient.new })
      end

      def close
        @client.close if @client
      ensure
        @client = nil unless Channel === @client
      end

      # Send the Message payload String using HTTP POST.
      # Returns the HTTPClient::Request response object.
      def _send_message message, message_payload
        client.with_stream! do | client |
          client.post(uri, message_payload)
        end
      end

      # Recieve the Result payload String from the opaque
      # HTTPClient::Request response object returned from #_send_message.
      def _receive_result message, http_result_message
        # $stderr.puts " ### http_result_message.content.encoding = #{http_result_message.content.encoding.inspect}" rescue nil
        # $stderr.puts " ### http_result_message.content = #{http_result_message.content.inspect}" rescue nil
        http_result_message.content.to_s
      end

      CONTENT_TYPE = 'Content-Type'.freeze
      APPLICATION_BINARY = 'application/binary'.freeze

    end

Transport Support

  class Transport
    def encoder
      @encoder ||=
        Coder::Identity.new
    end

    def decoder
      @decoder ||=
        encoder
    end

    # Invokes the Message object, returns a Result object.
    def invoke_message! message
      result = nil
      Transport.with_attr! :current, self do
        with_attr! :message, message do
          wait_for_delay! message
          result = invoker.invoke!(message, self)
          # Hook for Exceptions.
          if @on_result_exception && result.exception
            @on_result_exception.call(self, result)
          end
        end
      end
      result
    end
    # The current Message being handled.
    attr_accessor_thread :message

    # The current active Transport.
    cattr_accessor_thread :current

    # The Invoker responsible for invoking the Message.
    attr_accessor :invoker
    def invoker
      @invoker ||= Invoker.new
    end
  end

Message Identity

  module Identity
    attr_accessor :identifier, :timestamp

    # Optional: Opaque data about the Client that created the Message.
    attr_accessor :client

    # Optional: Opaque data about the Service that handled the Result.
    attr_accessor :server

    # Creates a thread-safe unique identifier.
    def create_identifier!
      @identifier ||= 
        @@identifier_mutex.synchronize do
          if @@uuid_pid != $$
            @@uuid_pid = $$
            @@uuid = nil
          end
          "#{@@counter += 1}-#{@@uuid ||= ::ASIR::UUID.generate}".freeze
        end
    end
    @@counter ||= 0; @@uuid ||= nil; @@uuid_pid = nil; @@identifier_mutex ||= Mutex.new

    # Creates a timestamp.
    def create_timestamp!
      @timestamp ||= 
        ::Time.now.gmtime
    end
  end
end

module ASIR

Message Delay Support

    module Delay
      # Returns the number of seconds from now, that the message should be delayed.
      # If message.delay is Numeric, sets message.delay to the Time to delay til.
      # If message.delay is Time, returns (now - message.delay).to_f
      # Returns Float if message.delay was set, or nil.
      # Returns 0 if delay has already expired.
      def relative_message_delay! message, now = nil
        case delay = message.delay
        when nil
        when Numeric
          now ||= Time.now
          delay = delay.to_f
          message.delay = (now + delay).utc
        when Time
          now ||= Time.now
          delay = (delay - now).to_f
          delay = 0 if delay < 0
        else
          raise TypeError, "Expected message.delay to be Numeric or Time, given #{delay.class}"
        end
        delay
      end

      def wait_for_delay! message
        while (delay = relative_message_delay!(message)) && delay > 0 
          sleep delay
        end
        self
      end

    end
  end
end
module ASIR
  class Transport

Code More

Help encode/decode and resolve receiver class.

  module CodeMore
    include ObjectResolving # resolve_object()
    include CodeBlock # encode_block!, decode_block!

    def encode_more!
      obj = encode_block! # may self.dup
      unless ::String === @receiver_class
        obj ||= self.dup # dont dup twice.
        obj.receiver = @receiver.name if ::Module === @receiver
        obj.receiver_class = @receiver_class.name
        if resp = obj.result and resp.message == self
          resp.message = obj
        end
      end
      obj || self
    end

    def decode_more!
      decode_block!
      if ::String === @receiver_class
        @receiver_class = resolve_object(@receiver_class)
        @receiver = resolve_object(@receiver) if ::Module === @receiver_class
        unless @receiver_class === @receiver
          raise Error, "receiver #{@receiver.class.name} is not a #{@receiver_class}" 
        end
      end
      self
    end

    # Mixin for Result.
    module Result
      def encode_more!
        @message = @message.encode_more! if @message
        self
      end

      def decode_more!
        @message = @message.decode_more! if @message
        self
      end
    end
  end

Client Mixin

  module Client
    def self.included target
      super
      target.extend ModuleMethods if Module === target
      target.send(:include, InstanceMethods) if Class === target
    end

    module CommonMethods
      def asir_options &blk
        asir._configure(&blk)
      end
      alias_method :"#{ASIR::Config.client_method}_options", :asir_options if ASIR::Config.client_method
    end

    module ModuleMethods
      include CommonMethods
      # Proxies are cached for Module/Class methods because serialization will not include
      # Transport.
      def asir
        @_asir ||= ASIR::Client::Proxy.new(self, self)
      end
      alias_method ASIR::Config.client_method, :asir if ASIR::Config.client_method
    end

    module InstanceMethods
      include CommonMethods
      # Proxies are not cached in instances because receiver is to be serialized by
      # its Transport's Coder.
      def asir
        proxy = self.class.asir.dup
        proxy.receiver = self
        proxy
      end
      alias_method ASIR::Config.client_method, :asir if ASIR::Config.client_method
    end
  end

Proxy Configuration

A Proc to call with the Message object before sending to transport#send_message(message).
Must return a Message object.

  module Client
    class Proxy
      attr_accessor :before_send_message

      # If true, this Message is one-way, even if the Transport is bi-directional.
      attr_accessor :_one_way

      # A Proc to call with the Message object before sending to transport#send_message(message).
      # See #_configure.
      attr_accessor :__configure

      # Returns a new Client Proxy with a block to be called with the Message.
      # This block can configure additional options of the Message before
      # it is sent to the Transport.
      #
      # Call sequence:
      #
      #   proxy.__configure.call(message, proxy).
      def _configure &blk
        proxy = @receiver == @receiver_class ? self.dup : self
        proxy.__configure = blk
        proxy
      end
      alias :_options :_configure
    end
  end

Addtional Data

Support additional data attached to any object.

  module AdditionalData
    def additional_data
      @additional_data || EMPTY_HASH
    end
    def additional_data!
      @additional_data ||= { }
    end
    def additional_data= x
      @additional_data = x
    end
    def [] key
      @additional_data && @additional_data[key]
    end
    def []= key, value
      (@additional_data ||= { })[key] = value
    end

    def self.included target
      super
      target.extend(ModuleMethods)
    end

    module ModuleMethods
      # Provide a getter method that delegates to addtional_data[...].
      def addr_getter *names
        names.each do | name |
          name = name.to_sym
          define_method(name) { | | addtional_data[name] }
        end
      end

      # Provide getter and setter methods that delegate to addtional_data[...].
      def addr_accessor *names
        addr_getter *names
        names.each do | name |
          name = name.to_sym
          define_method(:"#{name}=") { | v | addtional_data[name] = v }
        end
      end
    end

  end

Configuration Callbacks

  module Client
    class Proxy
      def initialize rcvr, rcvr_class
        @receiver = rcvr
        @receiver_class = rcvr_class
        _configure!
      end

      def _configure!
        key = @receiver_class
        (@@config_callbacks[key] ||
         @@config_callbacks[key.name] ||
         @@config_callbacks[nil] ||
         IDENTITY_LAMBDA).call(self)
        self
      end

      @@config_callbacks ||= { }
      def self.config_callbacks
        @@config_callbacks
      end
    end
  end

Code Block

Encode/decode Message#block.

  module CodeBlock
    # Most coders cannot serialize Procs.
    # But we can attempt to serialize a String representing a Proc.
    def encode_block!
      obj = nil
      if @block && ! ::String === @block_code
        obj ||= self.dup
        obj.block_code = CodeBlock.block_to_code(obj.block)
        obj.block = nil 
      end
      obj
    end

    def decode_block!
      if ::String === @block_code
        @block ||= CodeBlock.code_to_block(@block_code)
        @block_code = nil
      end
      self
    end

    # Returns a block_cache Hash.
    # Flushed every 1000 accesses.
    def self.block_cache
      cache = Thread.current[:'ASIR::CodeBlock.block_cache'] ||= { }
      count = Thread.current[:'ASIR::CodeBlock.block_cache_count'] ||= 0
      count += 1
      if count >= 1000
        cache.clear
        count = 0
      end
      Thread.current[:'ASIR::CodeBlock.block_cache_count'] = count
      cache
    end

    # Uses ruby2ruby, if loaded.
    def self.block_to_code block
      (block_cache[block.object_id] ||=
        [ block.respond_to?(:to_ruby) && block.to_ruby, block ]).
        first
    end

    # Calls eval.
    # May be unsafe.
    def self.code_to_block code
      (block_cache[code.dup.freeze] ||=
        [ eval(@block_code), code ]).
        first
    end

  end
end
module ASIR

JSON Coder

Note: Symbols are not handled.
The actual JSON expression is wrapped with an Array.

    class JSON < self
      def _encode obj
        [ obj ].to_json
      end

      def _decode obj
        parser = ::JSON.parser.new(obj)
        ary = parser.parse
        ary.first
      end
    end

Proc Coder

Generic Proc-based coder.

    class Proc < self
      # Procs that take one argument.
      attr_accessor :encoder, :decoder

      def _encode obj
        @encoder.call(obj)
      end
      def _decode obj
        @decoder.call(obj)
      end
    end

XML

Encode/Decode objects as XML.

    class XML < self
      class Error < ::ASIR::Error
        class BadIdref < self; end
      end
      def _encode obj
        @stream = ''
        @dom_id_map = { }
        @dom_id = 0
        @cls_tag_map = { }
        encode_dom obj
        @stream
      end

      def _decode obj
        @stream = obj
        @decoder ||= DECODER; @decoder_object = nil
        @dom_id_map = { }
        @dom_id = 0
        @cls_tag_map = { }
        @parser = ::XML::Parser.string(@stream)
        @dom = @parser.parse
        decode_dom @dom.root
      end

      def encode_dom obj
        if dom_id = @dom_id_map[obj.object_id]
          tag_obj(obj, nil, :idref => dom_id.first)
        else
          _encode_dom obj
        end
      end

      def _encode_dom obj
        case obj
        when NilClass, TrueClass, FalseClass
          tag_obj(obj)
        when Numeric
          tag_obj(obj, nil, :v => obj.to_s)
        when Symbol
          tag_obj(obj) do
            @stream << obj.to_s
          end
        when String
          tag_obj(obj, :id) do 
            @stream << obj.to_s
          end
        when Array
          tag_obj(obj, :id) do 
            obj.each do | elem |
              encode_dom elem
            end
          end
        when Hash
          tag_obj(obj, :id) do 
            obj.each do | key, val |
              encode_dom key
              encode_dom val
            end
          end
        else
          tag_obj(obj, :id) do 
            obj.instance_variables.each do | attr |
              val = obj.instance_variable_get(attr)
              key = attr.to_s.sub(/^@/, '')
              tag(key) do 
                encode_dom val
              end
            end
          end
        end
      end

      def tag_obj obj, with_id = false, attrs = nil
        if block_given?
          tag(cls_tag(obj), with_id ? { with_id => map_obj_to_dom_id!(obj) } : nil) do
            yield
          end
        else
          tag(cls_tag(obj), attrs)
        end
      end

      CC = '::'.freeze; D = '.'.freeze
      def cls_tag obj
        obj = obj.class
        @cls_tag_map[obj] ||= obj.name.gsub(CC, D).freeze
      end

      def map_obj_to_dom_id! obj
        if dom_id = @dom_id_map[obj.object_id]
          dom_id.first
        else
          @dom_id_map[obj.object_id] = [ @dom_id += 1, obj ]
          @dom_id
        end
      end

      B  = '<'.freeze;  S = ' '.freeze; E = '>'.freeze; SE = '/>'.freeze
      BS = '</'.freeze; A = '='.freeze
      def tag tag, attrs = nil
        tag = tag.to_s
        @stream << B << tag << S
        if attrs
          attrs.each do | key, val |
            @stream << key.to_s << A << val.to_s.inspect << S
          end
        end
        if block_given?
          @stream << E; yield; @stream << BS << tag << E
        else
          @stream << SE
        end
      end

      ################################################################

      def decode_dom dom
        if dom_id = dom.attributes[:idref]
          unless obj = @dom_id_map[dom_id]
            raise Error::BadIdref, "in element #{dom}"
          end
          obj
        else
          obj = _decode_dom(dom)
          map_dom_id_to_obj! dom, obj if dom.attributes[:id]
          obj
        end
      end

      def _decode_dom dom
        cls_name = dom.name
        decoder = @decoder[cls_name] || 
          (@decoder_object ||= @decoder['Object'])
        raise Error, "BUG: " unless decoder
        decoder.call(self, dom)
      end

      DECODER = {
        'NilClass'   => lambda { | _, dom | nil },
        'TrueClass'  => lambda { | _, dom | true },
        'FalseClass' => lambda { | _, dom | false },
        'String'     => lambda { | _, dom | (dom.attributes[:v] || dom.content) },
        'Symbol'     => lambda { | _, dom | (dom.attributes[:v] || dom.content).to_sym },
        'Integer'    => lambda { | _, dom | (dom.attributes[:v] || dom.content).to_i },
        'Float'      => lambda { | _, dom | (dom.attributes[:v] || dom.content).to_f },
        "Array" => lambda { | _, dom | 
          obj = [ ]
          _.map_dom_id_to_obj! dom, obj
          dom.each_element do | elem |
            obj << _.decode_dom(elem)
          end
          obj
        },
        'Hash' => lambda { | _, dom |
          obj = { }
          _.map_dom_id_to_obj! dom, obj
          key = nil
          dom.each_element do | val |
            if key
              obj[_.decode_dom(key)] = _.decode_dom(val)
              key = nil
            else
              key = val
            end
          end
          obj
        },
        'Object' => lambda { | _, dom |
          cls_name = dom.name
          # $stderr.puts "cls_name = #{cls_name.inspect}"
          cls = _.tag_cls(cls_name)
          obj = cls.allocate
          _.map_dom_id_to_obj! dom, obj
          dom.each_element do | child |
            key = child.name
            val = _.decode_dom child.first
            obj.instance_variable_set("@#{key}", val)
          end
          obj
        },
      }
      DECODER['Fixnum'] = DECODER['Bignum'] = DECODER['Integer']

      def map_dom_id_to_obj! dom, obj
        dom_id = dom.attributes[:id]
        debugger unless dom_id
        raise Error, "no :id attribute in #{dom}" unless dom_id
        if (other_obj = @dom_id_map[dom_id]) and other_obj.object_id != obj.object_id
          raise Error, "BUG: :id #{dom_id} already used for #{other_obj.class.name} #{other_obj.inspect}"
        end
        @dom_id_map[dom_id] = obj
      end

      include ObjectResolving
      def tag_cls cls_name
        @cls_tag_map[cls_name.freeze] ||= resolve_object(cls_name.gsub('.', '::'))
      end

    end

Invoker

Invokes the Message or Exception on behalf of a Transport.

  class Invoker
    include Initialization, AdditionalData

    def invoke! message, transport
      message.invoke!
    end
  end
end

module ASIR

Object Resolving

  module ObjectResolving
    class ResolveError < Error; end
    def resolve_object name
      name.to_s.split(MODULE_SEP).inject(Object){|m, n| m.const_get(n)}
    rescue ::Exception => err
      raise ResolveError, "cannot resolve #{name.inspect}: #{err.inspect}", err.backtrace
    end
  end

Generic retry behavior

  module RetryBehavior
    # Maximum trys.
    attr_accessor :try_max
    # Initial amount of seconds to sleep between each try.
    attr_accessor :try_sleep
    # Amount of seconds to increment sleep between each try.
    attr_accessor :try_sleep_increment
    # Maxinum amount of seconds to sleep between each try.
    attr_accessor :try_sleep_max

    # Yields:
    #   :try, n_try
    #   :rescue, exc
    #   :retry, exc
    #   :failed, nil
    def with_retry
      n_try = 0
      sleep_secs = try_sleep
      result = done = last_exception = nil
      begin
        n_try += 1
        result = yield :try, n_try
        done = true
      rescue *Error::Unforwardable.unforwardable => exc
        raise
      rescue ::Exception => exc
        last_exception = exc
        yield :rescue, exc
        if ! try_max || try_max > n_try
          yield :retry, exc
          if sleep_secs
            sleep sleep_secs if sleep_secs > 0
            sleep_secs += try_sleep_increment if try_sleep_increment
            sleep_secs = try_sleep_max if try_sleep_max && sleep_secs > try_sleep_max
          end
          retry
        end
      end
      unless done
        unless yield :failed, last_exception
          exc = last_exception
          raise RetryError, "Retry failed: #{exc.inspect}", exc.backtrace
        end
      end
      result
    end
    class RetryError < Error; end
  end # module

Beanstalk Transport

    class Beanstalk < TcpSocket
      LINE_TERMINATOR = "\r\n".freeze

      attr_accessor :tube, :priority, :delay, :ttr

      def initialize *args
        @port ||= 11300
        @tube ||= 'asir'
        @priority ||= 0
        @delay ||= 0
        @ttr ||= 600
        super
      end

      # Sends the encoded Message payload String.
      def _send_message message, message_payload; ...; end 
      # Receives the encoded Message payload String.
      def _receive_message channel, additional_data; ...; end 
      # Sends the encoded Result payload String.
      def _send_result message, result, result_payload, channel, stream; ...; end 
      # Receives the encoded Result payload String.
      def _receive_result message, opaque_result; ...; end 
      # Sets beanstalk_delay if message.delay was specified.
      def relative_message_delay! message, now = nil; ...; end 
      # Beanstalk protocol support
      def _beanstalk stream, message, expect, payload = nil; ...; end 
      # Beanstalk Server
      def _server!; ...; end 

Sets beanstalk_delay if message.delay was specified.

    class Beanstalk < TcpSocket
      def relative_message_delay! message, now = nil
        if delay = super
          message[:beanstalk_delay] = delay.to_i
        end
        delay
      end
    end

Beanstalk protocol support

Send “something …\r\n”.
Expect /\ASOMETHING (\d+)…\r\n".

    class Beanstalk < TcpSocket
      def _beanstalk stream, message, expect, payload = nil
        _log { [ :_beanstalk, :message, message ] } if @verbose >= 3
        stream.write message
        if payload
          stream.write payload
          stream.write LINE_TERMINATOR
        end
        stream.flush
        if match = _read_line_and_expect!(stream, expect)
          _log { [ :_beanstalk, :result, match[0] ] } if @verbose >= 3
        end
        match
      end

      def _after_connect! stream
        if @tube
          _beanstalk(stream,
                     "use #{@tube}\r\n",
                     /\AUSING #{@tube}\r\n\Z/)
        end
      end
    end

Beanstalk Server

    class Beanstalk < TcpSocket
      def _server!
        _log { "_server! #{uri}" } if @verbose >= 1
        @server = connect!(:try_max => nil,
                           :try_sleep => 1,
                           :try_sleep_increment => 0.1,
                           :try_sleep_max => 10) do | stream |
          if @tube
            _beanstalk(stream, 
                       "watch #{@tube}\r\n",
                       /\AWATCHING (\d+)\r\n\Z/)
          end
        end
        self
      end

      def _server_accept_connection! server
        prepare_server! unless @server
        [ @server, @server ]
      end

      def _server_close_connection! in_stream, out_stream
        # NOTHING
      end

      def stream_eof? stream
        # Note: stream.eof? on a beanstalkd connection,
        # will cause blocking read *forever* because
        # beanstalk connections are long lived.
        false
      end

      def _start_conduit!
        addr = address ? "-l #{address} " : ""
        cmd = "beanstalkd #{addr}-p #{port}"
        exec(cmd)
      end
    end
    end

A Transport composed of other Transports.

Classes that mix this in should define #send_message(message, messagepayload).

    module Composite
      include Delegation

      # Enumerable of Transport objects.
      attr_accessor :transports
      # If true, continue with other Transports when Transport#send_message raises an Exception.
      attr_accessor :continue_on_exception
    end

Connection-Oriented Transport

    class ConnectionOriented < Stream
      include PayloadIO, UriConfig

      # Returns a connected Channel.
      def stream; ...; end 
      # Sends the encoded Message payload String.
      def _send_message message, message_payload; ...; end 
      # Receives the encoded Message payload String.
      def _receive_message stream, additional_data; ...; end 
      # Sends the encoded Result payload String.
      def _send_result message, result, result_payload, stream, message_state; ...; end 
      # Receives the encoded Result payload String.
      def _receive_result message, opaque_result; ...; end 
      # Server
      def prepare_server!; ...; end 

Returns a connected Channel.

    class ConnectionOriented < Stream
      def stream
        @stream ||=
          connect!
      end

      # Yields Channel after _connect!.
      def connect!(opts = nil, &blk)
        base_opts = {
          :on_connect => lambda { | channel |
            connection = _connect!
            blk.call(connection) if blk
            connection
          }
        }
        base_opts.update(opts) if opts
        Channel.new(base_opts)
      end

      # Returns raw client stream.
      def _connect!
        _log { "_connect! #{uri}" } if @verbose >= 1
        stream = _client_connect!
        _log { "_connect! stream=#{stream}" } if @verbose >= 1
        _after_connect! stream
        stream
      rescue ::Exception => err
        raise err.class, "Cannot connect to #{self.class} #{uri}: #{err.inspect}", err.backtrace
      end

      # Subclasses can override.
      def _after_connect! stream
        self
      end

      # Subclasses can override.
      def _before_close! stream
        self
      end
    end

Sends the encoded Message payload String.

    class ConnectionOriented < Stream
      def _send_message message, message_payload
        stream.with_stream! do | io |
          _write message_payload, io
        end
      end
    end

Receives the encoded Message payload String.

    class ConnectionOriented < Stream
      def _receive_message stream, additional_data
        [ _read(stream), nil ]
      end
    end

Sends the encoded Result payload String.

    class ConnectionOriented < Stream
      def _send_result message, result, result_payload, stream, message_state
        unless @one_way || message.one_way
          _write result_payload, stream
        end
      end
    end

Receives the encoded Result payload String.

    class ConnectionOriented < Stream
      def _receive_result message, opaque_result
        unless @one_way || message.one_way
          stream.with_stream! do | io |
            _read io
          end
        end
      end
    end

Server

    class ConnectionOriented < Stream
      def prepare_server!
        _log { "prepare_server! #{uri}" } if @verbose >= 1
        _server!
      rescue ::Exception => err
        _log [ "prepare_server! #{uri}", :exception, err ]
        raise err.class, "Cannot prepare server on #{self.class} #{uri}: #{err.inspect}", err.backtrace
      end

      def run_server!
        _log { "run_server! #{uri}" } if @verbose >= 1
        with_server_signals! do
          @running = true
          server_on_start!
          while @running
            serve_connection!
          end
        end
        self
      ensure
        server_on_stop!
        _server_close!
      end

      def server_on_start!
      end

      def server_on_stop!
      end

      def serve_connection!
        in_stream, out_stream = _server_accept_connection! @server
        _log { "serve_connection!: connected #{in_stream} #{out_stream}" } if @verbose >= 1
        _server_serve_stream! in_stream, out_stream
      rescue Error::Terminate => err
        @running = false
        _log [ :serve_connection_terminate, err ]
      ensure
        _server_close_connection!(in_stream, out_stream)
        _log { "serve_connection!: disconnected" } if @verbose >= 1
      end
      alias :_server_serve_stream! :_serve_stream!

      def _server!
        raise Error::SubclassResponsibility, "_server!"
      end

      def _server_close!
        if @server
          @server.close rescue nil
        end
        @server = nil
        self
      end

      # Accept a client connection.
      # Returns [ in_stream, out_stream ].
      def _server_accept_connection! server
        raise Error::SubclassResponsibility, "_server_accept_connection!"
      end

      # Close a client connection.
      def _server_close_connection! in_stream, out_stream
        raise Error::SubclassResponsibility, "_server_close_connection!"
      end
    end
    end

A Transport that delgated to one or more other Transports.

Classes that include this must define #send_message(message, messagepayload).

    module Delegation
      # If true, reraise the first Exception that occurred during Transport#send_message.
      attr_accessor :reraise_first_exception

      # Proc to call(transport, message, exc) when a delegated #send_message fails.
      attr_accessor :on_send_message_exception

      # Proc to call(transport, message) when #send_message fails with no recourse.
      attr_accessor :on_failed_message

      # Return the subTransports#send_message result unmodified from #_send_message.
      def _receive_result message, opaque_result
        opaque_result
      end

      # Return the subTransports#send_message result unmodified from #_send_message.
      def receive_result message, opaque_result
        opaque_result
      end

      def needs_message_identifier? message
        @needs_message_identifier ||
          transports.any? { | t | t.needs_message_identifier?(message) }
      end

      def needs_message_timestamp? message
        @needs_message_timestamp ||
          transports.any? { | t | t.needs_message_timestamp?(message) }
      end

      # Subclasses with multiple transports should override this method.
      def transports
        @transports ||= [ transport ]
      end

      # Called from within _send_message rescue.
      def _handle_send_message_exception! transport, message, exc
        _log { [ :send_message, :transport_failed, exc ] }
        (message[:transport_exceptions] ||= [ ]) << "#{exc.inspect}"
        @on_send_message_exception.call(self, message, exc) if @on_send_message_exception
        self
      end
    end

Select a Transport based on a Proc.

    class Demux < self
      include Delegation

      # Proc returning actual transport to use.
      # transport_proc.call(transport, message)
      attr_accessor :transport_proc

      # Only active during #send_message.
      attr_accessor_thread :transport

      # Support for Delegation mixin.
      def transports; [ transport ]; end

      def send_message message
        with_attr! :transport, transport_proc.call(self, message) do
          super
        end
      end

      def _send_message message, message_payload
        transport.send_message(message)
      end
    end

Rack Transport

    class Rack < HTTP
      # Receive the Message payload String from the Rack::Request object.
      # Returns the [ Rack::Request, Rack::Response ] as the message_state.
      def _receive_message rack_req_res, additional_data
        body = rack_req_res.first.body.read
        [ body, rack_req_res ]
      end

      # Send the Result payload String in the Rack::Response object as application/binary.
      def _send_result message, result, result_payload, rack_rq_rs, message_state
        rack_response = rack_rq_rs[1]
        rack_response[CONTENT_TYPE] = APPLICATION_BINARY
        rack_response.write result_payload
      end

      # Constructs a Rackable App from this Transport.
      def rack_app &blk
        App.new(self, &blk)
      end

      # Rack Transport Application.
      class App
        def initialize transport = nil, &blk
          @app = transport
          instance_eval &blk if blk
        end

        def call env
          @app.call(env)
        end
      end

      # Rack application handler.
      def call(env)
        rq = ::Rack::Request.new(env)
        rs = ::Rack::Response.new
        rack_rq_rs = [ rq, rs ]
        serve_message! rack_rq_rs, rack_rq_rs
        rs.finish # => [ status, header, rbody ]
      end

      ###############################
      # Dummy server.

      def prepare_server! opts = { }
        self
      end

      # WEBrick under Rack.
      def run_server!
        #require 'rack/handler'
        u = URI.parse(uri); port = u.port # <= REFACTOR
        ::Rack::Handler::WEBrick.run \
          ::Rack::ShowExceptions.new(::Rack::Lint.new(self.rack_app)),
          :Port => port
        self
      end

      def stop_server!
        # NOT IMPLEMENTED
        self
      end
    end

Resque Transport

    class Resque < ConnectionOriented
      include PollThrottle

      attr_accessor :queues, :queue, :namespace, :throttle

      def initialize *args
        @port_default = 6379
        @scheme_default = 'redis'.freeze
        super
        self.one_way = true
        # Reraise exception, let Resque::Worker handle it.
        @on_exeception ||= lambda do | trans, exc, type, message |
          raise exc, exc.backtrace
        end
      end

      # Resque client.
      def _client_connect!; ...; end 
      # Resque server (worker).
      def _server!; ...; end 

Resque client.

    class Resque < ConnectionOriented
      def _client_connect!
        # $stderr.puts "  #{$$} #{self} _client_connect!"
        resque_connect!
      rescue ::Exception => exc
        raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
      end
    end

Resque server (worker).

    class Resque < ConnectionOriented
      def _server!
        resque_connect!
        resque_worker
      rescue ::Exception => exc
        raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
      end

      def _receive_result message, opaque_result
        return nil if one_way || message.one_way
        super
      end

      def _send_result message, result, result_payload, stream, message_state
        return nil if one_way || message.one_way
        super
      end

      def _send_message message, message_payload
        stream.with_stream! do | io |  # Force connect
          $stderr.puts "  #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2
          ::Resque.enqueue_to(queue, self.class, message_payload)
        end
      end

      def queues
        @queues ||=
          (
          x = nil
          x = path if @uri
          x ||= ""
          root, x = x.split('/')
          x ||= ""
          x = x.split(/(\s+|\s*,\s*)/)
          x.each(&:freeze)
          x.freeze
          )
      end

      # Defaults to [ 'asir' ].
      def queues_
        @queues_ ||=
          queues.empty? ? [ DEFAULT_QUEUE ] : queues.freeze
      end

      # Defaults to 'asir'.
      def queue
        @queue ||= queues_.first || DEFAULT_QUEUE
      end

      # Defaults to 'asir'.
      def namespace_
        @namespace_ ||= namespace || DEFAULT_QUEUE
      end

      DEFAULT_QUEUE = 'asir'.freeze

      def _server_accept_connection! server
        [ server, server ]
      end

      # Resque is message-oriented, process only one message per "connection".
      def stream_eof? stream
        false
      end

      # Nothing to be closed for Resque.
      def _server_close_connection! in_stream, out_stream
        # NOTHING
      end

      def serve_stream_message! in_stream, out_stream # ignored
        save = Thread.current[:asir_transport_resque_instance]
        Thread.current[:asir_transport_resque_instance] = self
        poll_throttle throttle do
          # $stderr.puts "  #{self} resque_worker = #{resque_worker} on queues #{resque_worker.queues}"
          if job = resque_worker.reserve
            $stderr.puts "  #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2
            resque_worker.process(job)
          end
          job
        end
        self
      ensure
        Thread.current[:asir_transport_resque_instance] = save
      end

      # Class method entry point from Resque::Job.perform.
      def self.perform payload
        # $stderr.puts "  #{self} process_job payload=#{payload.inspect}"
        t = Thread.current[:asir_transport_resque_instance]
        # Pass payload as in_stream; _receive_message will return it.
        t.serve_message! payload, nil
      end

      def _receive_message payload, additional_data # is actual payload
        # $stderr.puts "  #{self} _receive_message payload=#{payload.inspect}"
        [ payload, nil ]
      end

      ####################################

      def resque_uri
        @resque_uri ||=
          (
          unless scheme == 'redis'
            raise ArgumentError, "Invalid resque URI: #{uri.inspect}"
          end
          _uri
          )
      end

      def resque_connect!
        @redis =
          ::Redis.new({
                        :host => address || '127.0.0.1',
                        :port => port,
                        :thread_safe => true,
                      })
        if namespace_
          ::Resque.redis =
            @redis =
            ::Redis::Namespace.new(namespace_, :redis => @redis)
          ::Resque.redis.namespace = namespace_
        else
          ::Resque.redis = @redis
        end
        # $stderr.puts "  *** #{$$} #{self} resque_connect! #{@redis.inspect}"
        @redis
      end

      def resque_disconnect!
        ::Resque.redis = nil
      end

      def resque_worker
        @resque_worker ||= ::Resque::Worker.new(queues_)
      end

      def server_on_start!
        if worker = resque_worker
          worker.prune_dead_workers
          worker.register_worker
        end
        self
      end

      def server_on_stop!
        if worker = @resque_worker
          worker.unregister_worker
        end
        self
      end

      #########################################

      def _start_conduit!
        @redis_dir ||= "/tmp"
        @redis_conf ||= "#{@redis_dir}/asir-redis-#{port}.conf"
        @redis_log ||= "#{@redis_dir}/asir-redis-#{port}.log"
        ::File.open(@redis_conf, "w+") do | out |
          out.puts "daemonize no"
          out.puts "port #{port}"
          out.puts "loglevel warning"
          out.puts "logfile #{@redis_log}"
        end
        exec "redis-server", @redis_conf
      end
    end
    end

Retry Transport

    class Retry < self
      include Delegation, RetryBehavior

      # The transport to delegate to.
      attr_accessor :transport
      # Proc to call(transport, message) before retry.
      attr_accessor :before_retry

      def _send_message message, message_payload
        first_exception = nil
        with_retry do | action, data |
          case action
          when :try
            transport.send_message(message)
          when :rescue #, exc
            first_exception ||= data
            _handle_send_message_exception! transport, message, data
          when :retry #, exc
            before_retry.call(self, message) if before_retry
          when :failed
            @on_failed_message.call(self, message) if @on_failed_message
            if first_exception && @reraise_first_exception
              $! = first_exception
              raise
            end
            nil # fallback to raise RetryError
          end
        end
      end
    end

WEBrick Transport server.

    class Webrick < HTTP

      # Server-side: WEBrick

      # Receive the Message payload String from the HTTP Message object.
      # Returns the original http_message as the message_state.
      def _receive_message http_message, additional_data
        [ http_message.body, http_message ]
      end

      # Send the Result payload String in the HTTP Response object as application/binary.
      def _send_result message, result, result_payload, http_result, message_state
        http_result[CONTENT_TYPE] = APPLICATION_BINARY
        http_result.body = result_payload
      end

      def prepare_server! opts = { }
        u = URI.parse(uri)
        port = u.port
        path = u.path
        opts[:Port] ||= port
        @server = ::WEBrick::HTTPServer.new(opts)
        @server.mount_proc path, lambda { | rq, rs |
          serve_message! rq, rs
        }
        self
      rescue ::Exception => exc
        raise Error, "Webrick Server #{uri.inspect}: #{exc.inspect}", exc.backtrace
      end

      def run_server!
        @server.start
        self
      end

      def stop_server!
        @server.stop
        self
      end
    end

ZeroMQ Transport

    class Zmq < ConnectionOriented
      attr_accessor :queue

      # 0MQ client.
      def _client_connect!; ...; end 
      # 0MQ server.
      def _server!; ...; end 

0MQ client.

    class Zmq < ConnectionOriented
      def _client_connect!
        sock = zmq_context.socket(one_way ? ZMQ::PUB : ZMQ::REQ)
        sock.connect(zmq_uri)
        sock
      rescue ::Exception => exc
        raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
      end
    end

0MQ server.

    class Zmq < ConnectionOriented
      def _server!
        sock = zmq_context.socket(one_way ? ZMQ::SUB : ZMQ::REP)
        sock.setsockopt(ZMQ::SUBSCRIBE, queue) if one_way
        sock.bind("tcp://*:#{port}") # WTF?: why doesn't tcp://localhost:PORT work?
        @server = sock
      rescue ::Exception => exc
        raise exc.class, "#{self.class} #{zmq_uri}: #{exc.message}", exc.backtrace
      end

      def _receive_result message, opaque_result
        return nil if one_way || message.one_way
        super
      end

      def _send_result message, result, result_payload, stream, message_state
        return nil if one_way || message.one_way
        super
      end

      def _write payload, stream
        payload.insert(0, queue_) if one_way
        stream.send payload, 0
        stream
      end

      def _read stream
        stream.recv 0
      end

      def queue
        @queue ||=
          (
          case
          when @uri
            x = URI.parse(@uri).path
          else
            x = ""
          end
          # x << "\t" unless x.empty?
          x.freeze
          )
      end
      def queue_
        @queue_ ||=
          (queue.empty? ? queue : queue + " ").freeze
      end

      # server represents a receiving ZMQ endpoint.
      def _server_accept_connection! server
        [ server, @one_way ? nil : server ]
      end

      # ZMQ is message-oriented, process only one message per "connection".
      alias :_server_serve_stream :serve_message!

      def stream_eof? stream
        false
      end

      # Nothing to be closed for ZMQ.
      def _server_close_connection! in_stream, out_stream
        # NOTHING
      end

      def zmq_uri
        @zmq_uri ||=
          (
          u = URI.parse(uri)
          u.path = ''
          u.to_s.freeze
          )
      end

      def zmq_context
        @@zmq_context ||=
          ZMQ::Context.new(1)
      end
      @@zmq_context ||= nil
    end
    end

Serve a Message.

  class Transport
    def serve_message! in_stream, out_stream
      message = message_state = message_ok = result = result_ok = nil
      exception = original_exception = unforwardable_exception = nil
      message, message_state = receive_message(in_stream)
      if message
        message_ok = true
        result = invoke_message!(message)
        result_ok = true
        self
      else
        nil
      end
    rescue ::Exception => exc
      exception = original_exception = exc
      _log [ :message_error, exc ]
      @on_exception.call(self, exc, :message, message, nil) if @on_exception
    ensure
      begin
        if message_ok
          if exception && ! result_ok
            case exception
            when *Error::Unforwardable.unforwardable
              unforwardable_exception = exception = Error::Unforwardable.new(exception)
            end
            result = Result.new(message, nil, exception)
          end
          if out_stream
            send_result(result, out_stream, message_state)
          end
        end
      rescue ::Exception => exc
        _log [ :result_error, exc ]
        @on_exception.call(self, exc, :result, message, result) if @on_exception
      end
      raise original_exception if unforwardable_exception
    end
  end

Transport Server Support

  class Transport
    attr_accessor :running

    def stop! force = false
      @running = false
      stop_server! if respond_to?(:stop_server!)
      raise Error::Terminate if force
      self
    end

    def with_server_signals!
      old_trap = { }
      [ "TERM", "HUP" ].each do | sig |
        trap = proc do | *args |
          stop!
          @signal_exception = ::ASIR::Error::Terminate.new("#{self} by SIG#{sig} #{args.inspect} in #{__FILE__}:#{__LINE__}")
        end
        old_trap[sig] = Signal.trap(sig, trap)
      end
      yield
      if exc = @signal_exception
        @signal_exception = nil
        raise exc
      end
    ensure
      # $stderr.puts "old_trap = #{old_trap.inspect}"
      old_trap.each do | sig, trap |
        Signal.trap(sig, trap) rescue nil
      end
    end
  end

Asynchronous beanstalkd service

require 'example_helper'
require 'asir/transport/beanstalk'
require 'asir/coder/zlib'
begin
  Email.asir.transport = t =
    ASIR::Transport::Beanstalk.new(:address => '127.0.0.1', :port => 30904)
  t.encoder =
    ASIR::Coder::Chain.new(:encoders =>
                           [ ASIR::Coder::Marshal.new,
                            ASIR::Coder::Zlib.new, ])
  t.start_conduit!; sleep 1
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com", :customer => @customer)
  sleep 2
  server_process do
    t.prepare_server!
    t.run_server!
  end
rescue Object => err
  $stderr.puts "#{err.inspect}\n#{err.backtrace * "\n"}"
ensure
  t.close; sleep 3; server_kill; sleep 2
  t.stop_conduit!
end

Synchronous HTTP service

require 'example_helper'
require 'asir/transport/webrick'
require 'asir/coder/base64'
require 'asir/coder/zlib'
begin
  Email.asir.transport = t =
    ASIR::Transport::Webrick.new(:uri => "http://localhost:31913/")
  t.encoder =
    ASIR::Coder::Chain.new(:encoders =>
                           [ASIR::Coder::Marshal.new,
                            ASIR::Coder::Base64.new, ])
  server_process do
    t.prepare_server!
    t.run_server!
  end
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com",
                             :customer => @customer)
  sleep 2
rescue Object => err
  $stderr.puts "#{err.inspect}\n#{err.backtrace * "\n"}"
ensure
  t.close rescue nil; sleep 3
  server_kill
end

Synchronous HTTP service on instance methods.

require 'example_helper'
require 'asir/transport/webrick'
require 'asir/coder/base64'
begin
  MyClass.asir.transport = t =
    ASIR::Transport::Webrick.new(:uri => "http://localhost:30914/")
  t.encoder =
    ASIR::Coder::Chain.new(:encoders =>
                           [ ASIR::Coder::Marshal.new,
                             ASIR::Coder::Base64.new, ])
  server_process do
    t.prepare_server!
    t.run_server!
  end
  pr MyClass.new("abc123").asir.size
  sleep 2
rescue Object => err
  $stderr.puts "#{err.inspect}\n#{err.backtrace * "\n"}"
ensure
  t.close; sleep 3; server_kill
end

Local Process with delay option.

require 'example_helper'

pr DelayedService.asir.
  _configure{|req, p| req.delay = 5}.
  do_it(Time.now)

Asynchronous beanstalkd service with delay option

require 'example_helper'
require 'asir/transport/beanstalk'
begin
  DelayedService.asir.transport = t =
    ASIR::Transport::Beanstalk.new(:address => '127.0.0.1', :port => 30916)
  t.encoder =
    ASIR::Coder::Marshal.new
  t.start_conduit!; sleep 1
  server_process do
    t.prepare_server!
    t.run_server!
  end
  pr DelayedService.asir.
    _configure{|req, p| req.delay = 5}.
    do_it(Time.now)
  sleep 10
rescue Object => err
  $stderr.puts "#{err.inspect}\n#{err.backtrace * "\n"}"
ensure
  t.close; sleep 1
  server_kill; sleep 1
  t.stop_conduit!
end

Buffered asynchronous beanstalkd service with delay option

require 'example_helper'
require 'asir/transport/beanstalk'
require 'asir/transport/buffer'
begin
  t =
    ASIR::Transport::Beanstalk.new(:address => '127.0.0.1', :port => 30917)
  t.encoder =
    ASIR::Coder::Marshal.new
  t.start_conduit!; sleep 1
  DelayedService.asir.transport =
    t0 = ASIR::Transport::Buffer.new(:transport => t)
  t0.pause!
  server_process do
    t.prepare_server!
    t.run_server!
  end
  pr [ :paused?, t0.paused?, :at, Time.now.iso8601(2) ]
  pr DelayedService.asir.
    _configure{|req, p| req.delay = 5}.
    do_it(Time.now)
  sleep 2
  pr [ :resuming, :size, t0.size, :at, Time.now.iso8601(2) ]
  t0.resume!
  pr [ :paused?, t0.paused?, :size, t0.size, :at, Time.now.iso8601(2) ]
  pr [ :resumed, :size, t0.size, :at, Time.now.iso8601(2) ]
  sleep 7
rescue Object => err
  $stderr.puts "#{err.inspect}\n#{err.backtrace * "\n"}"
ensure
  t.close; sleep 1; server_kill; sleep 1
  t.stop_conduit!
end

Socket service with retry.

require 'example_helper'
require 'asir/transport/retry'
begin
  File.unlink(service_log = "#{__FILE__}.service.log") rescue nil
  file = ASIR::Transport::File.new(:file => service_log,
                                   :encoder => ASIR::Coder::Yaml.new)
  tcp = ASIR::Transport::TcpSocket.new(:port => 31918,
                                       :encoder => ASIR::Coder::Marshal.new)
  start_server_proc = lambda do | transport, message |
    $stderr.puts "message = #{message.inspect}"
    file.send_message(message)
    server_process do
      tcp.prepare_server!
      tcp.run_server!
    end
  end
  Email.asir.transport = t =
    ASIR::Transport::Retry.new(:transport => tcp,
                               :try_sleep => 1,
                               :try_sleep_increment => 2,
                               :try_max => 3,
                               :before_retry => start_server_proc
                               )
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com", :customer => 123)
  sleep 1
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user2@email.com", :customer => 456)
  sleep 1
rescue ::Exception => err
  $stderr.puts "ERROR: #{err.inspect}\n  #{err.backtrace * "\n  "}"
  raise
ensure
  file.close rescue nil;
  tcp.close rescue nil; sleep 1
  server_kill
  puts "\x1a\n#{service_log.inspect} contents:"
  puts File.read(service_log)
end

Socket service with unsafe Exception.

require 'example_helper'
begin
  tcp = ASIR::Transport::TcpSocket.new(:port => 31919,
                                       :encoder => ASIR::Coder::Marshal.new)
  server_process do
    tcp.prepare_server!
    tcp.run_server!
  end
  UnsafeService.asir.transport = t = tcp
  pr UnsafeService.asir.do_it("exit 999; :ok")
  sleep 1
rescue ::ASIR::Error::Unforwardable => err
  $stderr.puts "### #{$$}: Unforwardable ERROR: #{err.inspect}}"
rescue ::Exception => err
  $stderr.puts "### #{$$}: ERROR: #{err.inspect}\n  #{err.backtrace * "\n  "}"
  raise
ensure
  file.close rescue nil;
  tcp.close rescue nil; sleep 1
  server_kill
#  puts "\x1a\n#{service_log.inspect} contents:"
#  puts File.read(service_log)
end

One-way ZMQ service.

require 'example_helper'
require 'asir/transport/zmq'
begin
  zmq = ASIR::Transport::Zmq.new(:port => 31920,
                                 :encoder => ASIR::Coder::Marshal.new,
                                 :one_way => true)
  server_process do
    zmq.prepare_server!
    zmq.run_server!
  end
  UnsafeService.asir.transport = t = zmq
  pr UnsafeService.asir.do_it(":ok")
rescue ::Exception => err
  $stderr.puts "### #{$$}: ERROR: #{err.inspect}\n  #{err.backtrace * "\n  "}"
  raise
ensure
  zmq.close rescue nil; sleep 1; server_kill
end

Bi-directional ZMQ service.

require 'example_helper'
require 'asir/transport/zmq'
begin
  zmq = ASIR::Transport::Zmq.new(:port => 31920,
                                 :encoder => ASIR::Coder::Marshal.new,
                                 :one_way => false)
  server_process do
    zmq.prepare_server!
    zmq.run_server!
  end
  UnsafeService.asir.transport = t = zmq
  pr UnsafeService.asir.do_it(":ok")
rescue ::Exception => err
  $stderr.puts "### #{$$}: ERROR: #{err.inspect}\n  #{err.backtrace * "\n  "}"
  raise
ensure
  zmq.close rescue nil; sleep 1; server_kill
end

In-core, in-process service, with continuation block.

require 'example_helper'
pr(Email.asir.send_email(:pdf_invoice,
                           :to => "user@email.com",
                           :customer => @customer,
                           &proc { | res | pr [ :in_block, res.result ] })
)

Synchronous HTTP service on Rack under WEBrick

require 'example_helper'
gem 'rack'
require 'asir/transport/rack'
require 'asir/coder/base64'
begin
  Email.asir.transport = t =
    ASIR::Transport::Rack.new(:uri => "http://localhost:31924/")
  t.encoder =
    ASIR::Coder::Chain.new(:encoders =>
                           [ASIR::Coder::Marshal.new,
                            ASIR::Coder::Base64.new, ])
  server_process do
    t.prepare_server!
    t.run_server!
  end
  pr Email.asir.send_email(:pdf_invoice,
                             :to => "user@email.com",
                             :customer => @customer)
  sleep 2
rescue Object => err
  $stderr.puts "#{err.inspect}\n#{err.backtrace * "\n"}"
ensure
  t.close rescue nil; sleep 3
  server_kill; sleep 2
end

One-way Resque service.

require 'rubygems'
gem 'resque'
require 'example_helper'
require 'asir/transport/resque'
begin
  t = ASIR::Transport::Resque.new(:port => 31925,
                              :encoder => ASIR::Coder::Marshal.new)
  # Control throttling of Resque::Worker inside ASIR::Transport::Resque
  t.throttle = {
    # :verbose => true,
    :min_sleep => 0.0,
    :max_sleep => 2.0,
    :inc_sleep => 0.1,
    :mul_sleep => 1.5,
    :rand_sleep => 0.1,
  }
  t.start_conduit!; sleep 1
  server_process do
    t.prepare_server!
    t.run_server!
  end
  UnsafeService.asir.transport = t
  pr UnsafeService.asir.do_it(":ok")
rescue ::Exception => err
  $stderr.puts "### #{$$}: ERROR: #{err.inspect}\n  #{err.backtrace * "\n  "}"
  raise
ensure
  sleep 5
  t.close rescue nil; sleep 1; server_kill
  t.stop_conduit!
end

Sample Object Instance Client

class MyClass
  include ASIR::Client
  def initialize x
    @x = x
  end
  def method_missing sel, *args
    @x.send(sel, *args)
  end
end

Asynchronous beanstalkd service – Output

*** 10562: client process
*** 10562: pr: nil
*** 10565: server process
*** 10565: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}

          
        

Synchronous HTTP service – Output

*** 10577: client process
*** 10579: server process
[2012-07-18 14:47:56] INFO  WEBrick 1.3.1
[2012-07-18 14:47:56] INFO  ruby 1.8.7 (2012-02-08) [i686-darwin10]
[2012-07-18 14:47:56] INFO  WEBrick::HTTPServer#start: pid=10579 port=31913
*** 10579: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
localhost - - [18/Jul/2012:14:47:57 CDT] "POST / HTTP/1.1" 200 94
- -> /
*** 10577: pr: :ok

          
        

Synchronous HTTP service on instance methods. – Output

*** 10582: client process
*** 10584: server process
[2012-07-18 14:48:02] INFO  WEBrick 1.3.1
[2012-07-18 14:48:02] INFO  ruby 1.8.7 (2012-02-08) [i686-darwin10]
[2012-07-18 14:48:02] INFO  WEBrick::HTTPServer#start: pid=10584 port=30914
localhost - - [18/Jul/2012:14:48:03 CDT] "POST / HTTP/1.1" 200 94
- -> /
*** 10582: pr: 6

          
        

Local Process with delay option. – Output

*** 10586: client process
DelayedService.do_it(2012-07-18T14:48:09-05:00) dt=5.000131 :ok
DelayedService.do_it => :ok
*** 10586: pr: :ok

          
        

Asynchronous beanstalkd service with delay option – Output

*** 10589: client process
*** 10592: server process
*** 10589: pr: nil
DelayedService.do_it(2012-07-18T14:48:16-05:00) dt=5.002611 :ok
DelayedService.do_it => :ok

Buffered asynchronous beanstalkd service with delay option – Output

*** 10595: client process
*** 10598: server process
*** 10595: pr: [:paused?, true, :at, "2012-07-18T14:48:30.87-05:00"]
*** 10595: pr: nil
*** 10595: pr: [:resuming, :size, 1, :at, "2012-07-18T14:48:32.87-05:00"]
*** 10595: pr: [:paused?, false, :size, 0, :at, "2012-07-18T14:48:32.87-05:00"]
*** 10595: pr: [:resumed, :size, 0, :at, "2012-07-18T14:48:32.87-05:00"]
DelayedService.do_it(2012-07-18T14:48:30-05:00) dt=5.000379 :ok
DelayedService.do_it => :ok

Socket service with retry. – Output

*** 10601: client process
RETRY: 1: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 2: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 3: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 4: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 5: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\

MORE

Socket service with retry. – Output – Page 2

ion refused - connect(2)>>
RETRY: 6: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 7: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 8: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
RETRY: 9: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect to \
ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connect\
ion refused - connect(2)>>
FAILED: 10: ERROR : #<Errno::ECONNREFUSED: Connection refused - Cannot connect t\
o ASIR::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Conne\

MORE

Socket service with retry. – Output – Page 3

ction refused - connect(2)>>
  10601 ASIR::Transport::Retry :send_message, :transport_failed, #<Errno::ECONNR\
EFUSED: Connection refused - Cannot connect to ASIR::Transport::TcpSocket tcp://\
127.0.0.1:31918: #<Errno::ECONNREFUSED: Connection refused - connect(2)>>
message = #<ASIR::Message:0x10253bfd0 @one_way=nil, @additional_data={:transport\
_exceptions=>["#<Errno::ECONNREFUSED: Connection refused - Cannot connect to ASI\
R::Transport::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connection\
 refused - connect(2)>>"]}, @arguments=[:pdf_invoice, {:to=>"user@email.com", :c\
ustomer=>123}], @receiver=Email, @receiver_class=Module, @selector=:send_email>
*** 10603: server process
*** 10603: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10601: pr: :ok
*** 10603: Email.send_mail :pdf_invoice {:to=>"user2@email.com", :customer=>456}
*** 10601: pr: :ok

MORE

Socket service with retry. – Output – Page 4

".riterate/ex18.rb-1-capture.rb.service.log" contents:
# asir_payload_size: 388
--- !ruby/object:ASIR::Message 
additional_data: 
  :transport_exceptions: 
  - "#<Errno::ECONNREFUSED: Connection refused - Cannot connect to ASIR::Transpo\
rt::TcpSocket tcp://127.0.0.1:31918: #<Errno::ECONNREFUSED: Connection refused -\
 connect(2)>>"
arguments: 
- :pdf_invoice
- :to: user@email.com
  :customer: 123
one_way: 
receiver: Email
receiver_class: Module

MORE

Socket service with retry. – Output – Page 5

selector: :send_email

# asir_payload_end

Socket service with unsafe Exception. – Output

*** 10605: client process
*** 10607: server process
### 10605: Unforwardable ERROR: #<ASIR::Error::Unforwardable: SystemExit (eval):\
1:in `exit': exit>}

One-way ZMQ service. – Output

*** 10609: client process
*** 10611: server process
*** 10609: pr: nil
10611: UnsafeService.do_it(:ok) :ok
10611: UnsafeService.do_it => :ok

Bi-directional ZMQ service. – Output

*** 10624: client process
*** 10626: server process
10626: UnsafeService.do_it(:ok) :ok
10626: UnsafeService.do_it => :ok
*** 10624: pr: :ok

          
        

In-core, in-process service, with continuation block. – Output

*** 10628: client process
*** 10628: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10628: pr: [:in_block, :ok]
*** 10628: pr: :ok

          
        

Subprocess service with continuation – Output

*** 10631: client process
*** 10631: pr: nil
*** 10633: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
*** 10633: pr: [:in_block, :ok]

          
        

Synchronous HTTP service on Rack under WEBrick – Output

*** 10635: client process
*** 10637: server process
[2012-07-18 14:49:00] INFO  WEBrick 1.3.1
[2012-07-18 14:49:00] INFO  ruby 1.8.7 (2012-02-08) [i686-darwin10]
[2012-07-18 14:49:00] INFO  WEBrick::HTTPServer#start: pid=10637 port=31924
*** 10637: Email.send_mail :pdf_invoice {:to=>"user@email.com", :customer=>123}
localhost - - [18/Jul/2012:14:49:01 CDT] "POST / HTTP/1.1" 200 94
- -> /
*** 10635: pr: :ok

          
        

One-way Resque service. – Output

*** 10639: client process
*** 10642: server process
*** 10639: pr: nil
10642: UnsafeService.do_it(:ok) :ok
10642: UnsafeService.do_it => :ok