Objective

  • Simplify inter-thread communication and management.
  • Provide a thread-safe Facade to object methods.
  • Allow objects to execute work safely in their own thread.
  • Handle results asynchronously.
  • Select Active Object Facade at run-time.
  • Simple API.

Ruby Pattern for Asynchronous Results

do_something_with(target.selector(*arguments))

# => 

target.selector(*arguments) do | result |
  do_something_with(result)
end

Implementation

  • ActiveObject::Facade – encapsuates object to receive messages.
  • ActiveObject::Facade::Passive – passive facade delivers message immediately to current Thread.
  • ActiveObject::Facade::Active – active facade managing a Thread and a Queue of Messages.
  • ActiveObject::Facade::Active::Message – encapsulate message for proxy for later execution by thread.
  • ActiveObject::Mixin – module to mixin to existing classes to handle Facade creation/initialization.

ActiveObject module

module ActiveObject
  # Generic API error.
  class Error < ::Exception; end

  # Logging
  module Logging; ...; end 
  # Facade
  class Facade; ...; end 
  # Facade Mixin
  module Mixin; ...; end 
end

Facade

Intercepts messages on behalf of the target object.
Subclasses of Facade handle delivery of message to the target object.

module ActiveObject
  class Facade
    include Logging

    def initialize target
      _log { "target=@#{target.object_id}" }
      (@target = target)._active_facade = self
    end

    # Passive Facade
    class Passive < self; ...; end 
    # Active Facade
    class Active < self; ...; end 
    # Active Facade Support
    def _active_target; ...; end 

    # Distributor
    class Distributor < Passive; ...; end 
  end
end

Passive Facade

Immediately delegate to the target.

module ActiveObject
  class Facade
    class Passive < self
      # Delegate message directly
      def method_missing selector, *arguments, &block; ...; end 
      # Passive Thread Management
      def _active_start!; ...; end 
    end
  end
end

Delegate message directly

Delegate messages immediately to @target.
Does not bother to construct a Message.

module ActiveObject
  class Facade
    class Passive < self
      def method_missing selector, *arguments, &block
        _log { "#{selector} #{arguments.inspect}" }
        result = @target.__send__(selector, *arguments)
        block ? block.call(result) : nil
      end
    end
  end
end

Passive Thread Management

module ActiveObject
  class Facade
    class Passive < self

      # Nothing to start; this Facade is not active.
      def _active_start!
        self
      end

      # Nothing to stop; this Facade is not active.
      def _active_stop!
        # NOTHING.
        self
      end
    end
  end
end

Facade Mixin

Glue Facade to including Class

module ActiveObject
  module Mixin
    def self.included target
      super
      target.instance_eval do 
        alias :_new_without_active_facade :new
      end
      target.extend(ClassMethods)
    end
    
    attr_accessor :_active_facade
end

Facade Interface

module ActiveObject
  module Mixin
    module ClassMethods
      include Logging

      # The Facade subclass to use for instances of the including Class.
      attr_accessor :active_facade
      
      # Override including class' .new method
      # to wrap actual object with a 
      # Facade instance.
      def new *arguments, &block
        _log { "arguments=#{arguments.inspect}" }
        obj = super(*arguments, &block)
        facade = (active_facade || Facade::Passive).new(obj)
        _log { "facade=@#{facade.object_id}" }
        facade
      end
    end
  end
end

Example

  • Two objects send messages back and forth to each other N times.
  • Mixin ActiveObject to each class.

class A

A#foo – Sends b.bar, until @counter == 0.

class A < Base
  attr_accessor :b

  def foo msg
    _log { "msg=#{msg.inspect} @counter=#{@counter}" }
    if decrement_counter_or_stop
      b.bar(msg) do | result | 
        _log { "result=#{result.inspect} " }
      end
      sleep(1)
    end
    [ :a, @counter ]
  end
end

class B

B#bar – Sends a.foo, until @counter == 0.

class B < Base
  attr_accessor :a

  def bar msg
    _log { "msg=#{msg.inspect} @counter=#{@counter}" }
    if decrement_counter_or_stop
      a.foo(msg) do | result | 
        _log { "result=#{result.inspect} " }
      end
      sleep(1)
    end
    [ :b, @counter ]
  end
end

Base class for example objects

class Base
  include ActiveObject::Mixin

  # Prepare to do activity N times.
  def initialize
    _log { "" }
    @counter = 1
  end

  # Stop its ActiveObject::Facade when @counter decrements to 0.
  def decrement_counter_or_stop
    if @counter > 0
      @counter -= 1
      true
    else
      _active_facade._active_stop!
      false
    end
  end

  include ActiveObject::Logging
  def _log_prefix; ""; end
end

Example with Passive Facade

require 'example_class'
A.active_facade = B.active_facade = nil
a = A.new
b = B.new

a.b = b
b.a = a

a.foo("Foo") 
b.bar("Bar") 

ActiveObject::Facade::Active.join

$stderr.puts "DONE!"

Example with Passive Facade – Output

Example with Passive Facade
  T@2148237740 @2148140020 A.new arguments=[]
T@2148237740 @2148139160 A#initialize 
  T@2148237740 @2148138740 ActiveObject::Facade::Passive#initialize target=@2148139160
  T@2148237740 @2148140020 A.new facade=@2148138740
  T@2148237740 @2148139540 B.new arguments=[]
T@2148237740 @2148137420 B#initialize 
  T@2148237740 @2148137000 ActiveObject::Facade::Passive#initialize target=@2148137420
  T@2148237740 @2148139540 B.new facade=@2148137000
  T@2148237740 @2148138740 ActiveObject::Facade::Passive#method_missing b= [#<ActiveObject::Facade::Passive:0x10013f050 @target=#<B:0x10013f398 @_active_facade=#<ActiveObject::Facade::Passive:0x10013f050 ...>, @counter=1>>]
  T@2148237740 @2148137000 ActiveObject::Facade::Passive#method_missing a= [#<ActiveObject::Facade::Passive:0x10013fde8 @target=#<A:0x100140130 @_active_facade=#<ActiveObject::Facade::Passive:0x10013fde8 ...>, @counter=1, @b=#<ActiveObject::Facade::Passive:0x10013f050 @target=#<B:0x10013f398 @_active_facade=#<ActiveObject::Facade::Passive:0x10013f050 ...>, @counter=1>>>>]
  T@2148237740 @2148138740 ActiveObject::Facade::Passive#method_missing foo ["Foo"]
T@2148237740 @2148139160 A#foo msg="Foo" @counter=1
  T@2148237740 @2148137000 ActiveObject::Facade::Passive#method_missing bar ["Foo"]
T@2148237740 @2148137420 B#bar msg="Foo" @counter=1

MORE

Example with Passive Facade – Output – Page 2

  T@2148237740 @2148138740 ActiveObject::Facade::Passive#method_missing foo ["Foo"]
T@2148237740 @2148139160 A#foo msg="Foo" @counter=0
T@2148237740 @2148137420 B#bar result=[:a, 0] 
T@2148237740 @2148139160 A#foo result=[:b, 0] 
  T@2148237740 @2148137000 ActiveObject::Facade::Passive#method_missing bar ["Bar"]
T@2148237740 @2148137420 B#bar msg="Bar" @counter=0
DONE!

Example with Active Facade

require 'example_class'
A.active_facade = B.active_facade = ActiveObject::Facade::Active
a = A.new
b = B.new

a.b = b
b.a = a

a.foo("Foo") 
b.bar("Bar") 

ActiveObject::Facade::Active.join

$stderr.puts "DONE!"

Example with Active Facade – Output

Example with Active Facade
  T@2148237740 @2148139980 A.new arguments=[]
T@2148237740 @2148139120 A#initialize 
  T@2148237740 @2148138700 ActiveObject::Facade::Active#initialize target=@2148139120
  T@2148237740 @2148139980 A.new facade=@2148138700
  T@2148237740 @2148139500 B.new arguments=[]
T@2148237740 @2148137320 B#initialize 
  T@2148237740 @2148136900 ActiveObject::Facade::Active#initialize target=@2148137320
  T@2148237740 @2148139500 B.new facade=@2148136900
  T@2148237740 @2148138700 ActiveObject::Facade::Active#method_missing b= [#<ActiveObject::Facade::Active:0x10013ef88 @stopped=false, @running=false, @target=#<B:0x10013f2d0 @_active_facade=#<ActiveObject::Facade::Active:0x10013ef88 ...>, @counter=1>, @mutex=#<Mutex:0x10013ebf0>, @queue=#<Queue:0x10013ebc8>>]
  T@2148237740 @2148138700 ActiveObject::Facade::Active#_active_start! 
  T@2148134440 @2148138700 ActiveObject::Facade::Active#_active_start! Thread.new
  T@2148237740 @2148138700 ActiveObject::Facade::Active#_active_start! @thread=@T2148134440
  T@2148237740 @2148133300 ActiveObject::Facade::Active::Message#initialize facade=@2148138700 selector=:b= arguments=[#<ActiveObject::Facade::Active:0x10013ef88 @stopped=false, @running=false, @target=#<B:0x10013f2d0 @_active_facade=#<ActiveObject::Facade::Active:0x10013ef88 ...>, @counter=1>, @mutex=#<Mutex:0x10013ebf0>, @queue=#<Queue:0x10013ebc8>>]
  T@2148237740 @2148138700 ActiveObject::Facade::Active#_active_enqueue message=@2148133300 @queue.size=0

MORE

Example with Active Facade – Output – Page 2

  T@2148134440 @2148138700 ActiveObject::Facade::Active#_active_dequeue message=@2148133300 @queue.size=0
  T@2148134440 @2148133300 ActiveObject::Facade::Active::Message#invoke! @facade=@2148138700
  T@2148237740 @2148136900 ActiveObject::Facade::Active#method_missing a= [#<ActiveObject::Facade::Active:0x10013fd98 @stopped=false, @running=true, @target=#<A:0x1001400e0 @_active_facade=#<ActiveObject::Facade::Active:0x10013fd98 ...>, @counter=1, @b=#<ActiveObject::Facade::Active:0x10013ef88 @stopped=false, @running=false, @target=#<B:0x10013f2d0 @_active_facade=#<ActiveObject::Facade::Active:0x10013ef88 ...>, @counter=1>, @mutex=#<Mutex:0x10013ebf0>, @queue=#<Queue:0x10013ebc8>>>, @mutex=#<Mutex:0x10013fa00>, @thread=#<Thread:0x10013dc50 sleep>, @queue=#<Queue:0x10013f9d8>>]
  T@2148237740 @2148136900 ActiveObject::Facade::Active#_active_start! 
  T@2148129740 @2148136900 ActiveObject::Facade::Active#_active_start! Thread.new
  T@2148237740 @2148136900 ActiveObject::Facade::Active#_active_start! @thread=@T2148129740
  T@2148237740 @2148128620 ActiveObject::Facade::Active::Message#initialize facade=@2148136900 selector=:a= arguments=[#<ActiveObject::Facade::Active:0x10013fd98 @stopped=false, @running=true, @target=#<A:0x1001400e0 @_active_facade=#<ActiveObject::Facade::Active:0x10013fd98 ...>, @counter=1, @b=#<ActiveObject::Facade::Active:0x10013ef88 @stopped=false, @running=true, @target=#<B:0x10013f2d0 @_active_facade=#<ActiveObject::Facade::Active:0x10013ef88 ...>, @counter=1>, @mutex=#<Mutex:0x10013ebf0>, @thread=#<Thread:0x10013b798 sleep>, @queue=#<Queue:0x10013ebc8>>>, @mutex=#<Mutex:0x10013fa00>, @thread=#<Thread:0x10013dc50 sleep>, @queue=#<Queue:0x10013f9d8>>]
  T@2148237740 @2148136900 ActiveObject::Facade::Active#_active_enqueue message=@2148128620 @queue.size=0
  T@2148129740 @2148136900 ActiveObject::Facade::Active#_active_dequeue message=@2148128620 @queue.size=0
  T@2148129740 @2148128620 ActiveObject::Facade::Active::Message#invoke! @facade=@2148136900
  T@2148237740 @2148138700 ActiveObject::Facade::Active#method_missing foo ["Foo"]
  T@2148237740 @2148126060 ActiveObject::Facade::Active::Message#initialize facade=@2148138700 selector=:foo arguments=["Foo"]
  T@2148237740 @2148138700 ActiveObject::Facade::Active#_active_enqueue message=@2148126060 @queue.size=0
  T@2148134440 @2148138700 ActiveObject::Facade::Active#_active_dequeue message=@2148126060 @queue.size=0
  T@2148134440 @2148126060 ActiveObject::Facade::Active::Message#invoke! @facade=@2148138700

MORE

Example with Active Facade – Output – Page 3

T@2148134440 @2148139120 A#foo msg="Foo" @counter=1
  T@2148134440 @2148136900 ActiveObject::Facade::Active#method_missing bar ["Foo"]
  T@2148134440 @2148122780 ActiveObject::Facade::Active::Message#initialize facade=@2148136900 selector=:bar arguments=["Foo"]
  T@2148134440 @2148136900 ActiveObject::Facade::Active#_active_enqueue message=@2148122780 @queue.size=0
  T@2148237740 @2148136900 ActiveObject::Facade::Active#method_missing bar ["Bar"]
  T@2148237740 @2148120660 ActiveObject::Facade::Active::Message#initialize facade=@2148136900 selector=:bar arguments=["Bar"]
  T@2148129740 @2148136900 ActiveObject::Facade::Active#_active_dequeue message=@2148122780 @queue.size=0
  T@2148129740 @2148122780 ActiveObject::Facade::Active::Message#invoke! @facade=@2148136900
T@2148129740 @2148137320 B#bar msg="Foo" @counter=1
  T@2148129740 @2148138700 ActiveObject::Facade::Active#method_missing foo ["Foo"]
  T@2148129740 @2148117740 ActiveObject::Facade::Active::Message#initialize facade=@2148138700 selector=:foo arguments=["Foo"]
  T@2148129740 @2148138700 ActiveObject::Facade::Active#_active_enqueue message=@2148117740 @queue.size=0
  T@2148237740 @2148136900 ActiveObject::Facade::Active#_active_enqueue message=@2148120660 @queue.size=1
  T@2148237740 @2148138700 ActiveObject::Facade::Active#join join thr=T@2148134440
  T@2148134440 @2148138700 ActiveObject::Facade::Active#_active_dequeue message=@2148117740 @queue.size=0

MORE

Example with Active Facade – Output – Page 4

  T@2148134440 @2148117740 ActiveObject::Facade::Active::Message#invoke! @facade=@2148138700
T@2148134440 @2148139120 A#foo msg="Foo" @counter=0
  T@2148134440 @2148138700 ActiveObject::Facade::Active#_active_stop! 
T@2148134440 @2148137320 B#bar result=[:a, 0] 
  T@2148134440 @2148138700 ActiveObject::Facade::Active#_active_start! stopped
DONE!
T@2148129740 @2148139120 A#foo result=[:b, 0] 
  T@2148129740 @2148136900 ActiveObject::Facade::Active#_active_dequeue message=@2148120660 @queue.size=0
  T@2148129740 @2148120660 ActiveObject::Facade::Active::Message#invoke! @facade=@2148136900
T@2148129740 @2148137320 B#bar msg="Bar" @counter=0
  T@2148129740 @2148136900 ActiveObject::Facade::Active#_active_stop! 
  T@2148129740 @2148136900 ActiveObject::Facade::Active#_active_start! stopped

Active Facade

Recieves message on behalf of the target object.
Places Message in its Queue.
Manages a Thread to pull Messages from its Queue for invocation.

module ActiveObject
  class Facade
    class Active < self
      # Signal Thread to stop working on queue.
      class Stop < ::Exception; end

      # Active Facade Initialization
      def initialize target; ...; end 
      # Intercept Message
      def method_missing selector, *arguments, &block; ...; end 
      # Message
      class Message; ...; end 
      # Message Queuing
      def _active_enqueue message; ...; end 
      # Start worker Thread
      def _active_start!; ...; end 
  end
end

Intercept Message

Intercept message on behalf of @target.
Construct Message and place it in its Queue.

module ActiveObject
  class Facade
    class Active < self
      def method_missing selector, *arguments, &block
        _log { "#{selector} #{arguments.inspect}" }
        _active_start! unless @running
        _active_enqueue(Message.new(self, selector, arguments, block))
      end
    end
  end
end

Message

Encapsulates Ruby message.

module ActiveObject
  class Facade
    class Active < self
      class Message
        include Logging
        attr_accessor :facade, :selector, :arguments, :block, :thread
        attr_accessor :result, :exception

        # Message Initialization
        def initialize facade, selector, arguments, block; ...; end 
        
        # Message Invocation
        def invoke!; ...; end 
      end
    end
  end
end

Message Initialization

Capture the requesting Thread to return any Exceptions back to requestor.

module ActiveObject
  class Facade
    class Active < self
      class Message
        def initialize facade, selector, arguments, block
          _log { "facade=@#{facade.object_id} selector=#{selector.inspect} arguments=#{arguments.inspect}" }
          @facade, @selector, @arguments, @block = facade, selector, arguments, block
          @thread = ::Thread.current
      end
    end
  end
end

Message Invocation

If block was provided, call it with result after Message invocation.
If Exception was raised, forward it to the requesting Thread.

module ActiveObject
  class Facade
    class Active < self
      class Message
        def invoke!
          _log { "@facade=@#{@facade.object_id}" }
          @result = @facade._active_target.__send__(@selector, *@arguments)
          @block.call(@result) if @block
        rescue Exception => exc
          @thread.raise(@exception = exc)
      end
    end
  end
end

Active Facade Initialization

module ActiveObject
  class Facade
    class Active < self
      def initialize target
        super
        @mutex = Mutex.new
        @queue = Queue.new
        @running = @stopped = false
      end
    end
  end
end

Message Queuing

module ActiveObject
  class Facade
    class Active < self
      def _active_enqueue message
        return if @stopped
        _log { "message=@#{message.object_id} @queue.size=#{@queue.size}" }
        @queue.push message
      end
      
      def _active_dequeue
        message = @queue.pop
        _log { "message=@#{message.object_id} @queue.size=#{@queue.size}" }
        message
      end
    end
  end
end

Active Facade Support

module ActiveObject
  class Facade

    def _active_target
      @target
    end

    def _active_thread
      @thread
    end

    @@active_facades = nil
    def self.active_facades
      @@active_facades ||= [ ]
    end

    def self.join
      active_facades.each do | f |
        if thr = f._active_thread
          f._log { "join thr=T@#{thr.object_id}" }
          thr.join rescue nil
        end
      end
  end
end

Start worker Thread

Start a Thread that blocks waiting for Message in its Queue.

module ActiveObject
  class Facade
    class Active < self
      def _active_start!
        _log { "" }
        @mutex.synchronize do
          return self if @running || @thread || @stopped
          @stopped = false
          @thread = Thread.new do 
            _log { "Thread.new" }
            @running = true
            Active.active_facades << self
            while @running && ! @stopped
              begin
                _active_dequeue.invoke! if @running && ! @stopped
              rescue Stop => exc
                _log { "stopping via #{exc.class}" }
              end
            end
            Active.active_facades.delete(self)
            _log { "stopped" }
            self
          end
          _log { "@thread=@T#{@thread.object_id}" }
          @thread
        end
        self
      end
    end
  end
end

Stop worker Thread

Sends exception to Thread to tell it to stop.

module ActiveObject
  class Facade
    class Active < self
      def _active_stop!
        _log { "" }
        t = @mutex.synchronize do
          return self if @stopped || ! @thread || ! @running
          @stopped = true
          @running = false
          t = @thread
          @thread = nil
          t
        end
        if t.alive?
          t.raise(Stop.new) rescue nil
        end
        self
      rescue Stop => exc
        # Handle Stop thrown to main thread after last Thread#join.
        self
      end
    end
    end
  end
end

Example with Active Distributor

require 'example_class'
A.active_facade = B.active_facade = ActiveObject::Facade::Distributor
a = A.new
b = B.new

a.b = b
b.a = a

a._active_add_facade! ActiveObject::Facade::Active
a._active_add_facade! ActiveObject::Facade::Active
b._active_add_facade! ActiveObject::Facade::Active
b._active_add_facade! ActiveObject::Facade::Active

a.foo("Foo") 
b.bar("Bar") 

ActiveObject::Facade::Active.join

$stderr.puts "DONE!"

Example with Active Distributor – Output

Example with Active Distributor
  T@2148237740 @2148139240 A.new arguments=[]
T@2148237740 @2148138380 A#initialize 
  T@2148237740 @2148137960 ActiveObject::Facade::Distributor#initialize target=@2148138380
  T@2148237740 @2148139240 A.new facade=@2148137960
  T@2148237740 @2148138760 B.new arguments=[]
T@2148237740 @2148136580 B#initialize 
  T@2148237740 @2148136160 ActiveObject::Facade::Distributor#initialize target=@2148136580
  T@2148237740 @2148138760 B.new facade=@2148136160
  T@2148237740 @2148137960 ActiveObject::Facade::Distributor#method_missing b= [#<ActiveObject::Facade::Distributor:0x10013e9c0 @target_index=-1, @target=#<B:0x10013ed08 @counter=1, @_active_facade=#<ActiveObject::Facade::Distributor:0x10013e9c0 ...>>, @mutex=#<Mutex:0x10013e628>, @target_list=[]>]
  T@2148237740 @2148137960 ActiveObject::Facade::Distributor#method_missing b= [#<ActiveObject::Facade::Distributor:0x10013e9c0 @target_index=-1, @target=#<B:0x10013ed08 @counter=1, @_active_facade=#<ActiveObject::Facade::Distributor:0x10013e9c0 ...>>, @mutex=#<Mutex:0x10013e628>, @target_list=[]>]
  T@2148237740 @2148136160 ActiveObject::Facade::Distributor#method_missing a= [#<ActiveObject::Facade::Distributor:0x10013f7d0 @target_index=-1, @target=#<A:0x10013fb18 @counter=1, @_active_facade=#<ActiveObject::Facade::Distributor:0x10013f7d0 ...>, @b=#<ActiveObject::Facade::Distributor:0x10013e9c0 @target_index=-1, @target=#<B:0x10013ed08 @counter=1, @_active_facade=#<ActiveObject::Facade::Distributor:0x10013e9c0 ...>>, @mutex=#<Mutex:0x10013e628>, @target_list=[]>>, @mutex=#<Mutex:0x10013f438>, @target_list=[]>]
  T@2148237740 @2148136160 ActiveObject::Facade::Distributor#method_missing a= [#<ActiveObject::Facade::Distributor:0x10013f7d0 @target_index=-1, @target=#<A:0x10013fb18 @counter=1, @_active_facade=#<ActiveObject::Facade::Distributor:0x10013f7d0 ...>, @b=#<ActiveObject::Facade::Distributor:0x10013e9c0 @target_index=-1, @target=#<B:0x10013ed08 @counter=1, @_active_facade=#<ActiveObject::Facade::Distributor:0x10013e9c0 ...>>, @mutex=#<Mutex:0x10013e628>, @target_list=[]>>, @mutex=#<Mutex:0x10013f438>, @target_list=[]>]
  T@2148237740 @2148132020 ActiveObject::Facade::Active#initialize target=@2148132060
  T@2148237740 @2148131440 ActiveObject::Facade::Active#initialize target=@2148133540

MORE

Example with Active Distributor – Output – Page 2

  T@2148237740 @2148130860 ActiveObject::Facade::Active#initialize target=@2148132040
  T@2148237740 @2148130280 ActiveObject::Facade::Active#initialize target=@2148132000
  T@2148237740 @2148137960 ActiveObject::Facade::Distributor#method_missing foo ["Foo"]
  T@2148237740 @2148132020 ActiveObject::Facade::Active#method_missing foo ["Foo"]
  T@2148237740 @2148132020 ActiveObject::Facade::Active#_active_start! 
  T@2148128300 @2148132020 ActiveObject::Facade::Active#_active_start! Thread.new
  T@2148237740 @2148132020 ActiveObject::Facade::Active#_active_start! @thread=@T2148128300
  T@2148237740 @2148127120 ActiveObject::Facade::Active::Message#initialize facade=@2148132020 selector=:foo arguments=["Foo"]
  T@2148237740 @2148132020 ActiveObject::Facade::Active#_active_enqueue message=@2148127120 @queue.size=0
  T@2148128300 @2148132020 ActiveObject::Facade::Active#_active_dequeue message=@2148127120 @queue.size=0
  T@2148128300 @2148127120 ActiveObject::Facade::Active::Message#invoke! @facade=@2148132020
T@2148128300 @2148132060 A#foo msg="Foo" @counter=1
  T@2148128300 @2148136160 ActiveObject::Facade::Distributor#method_missing bar ["Foo"]
  T@2148128300 @2148130860 ActiveObject::Facade::Active#method_missing bar ["Foo"]
  T@2148128300 @2148130860 ActiveObject::Facade::Active#_active_start! 

MORE

Example with Active Distributor – Output – Page 3

  T@2148122080 @2148130860 ActiveObject::Facade::Active#_active_start! Thread.new
  T@2148237740 @2148136160 ActiveObject::Facade::Distributor#method_missing bar ["Bar"]
  T@2148237740 @2148130280 ActiveObject::Facade::Active#method_missing bar ["Bar"]
  T@2148237740 @2148130280 ActiveObject::Facade::Active#_active_start! 
  T@2148119720 @2148130280 ActiveObject::Facade::Active#_active_start! Thread.new
  T@2148128300 @2148130860 ActiveObject::Facade::Active#_active_start! @thread=@T2148122080
  T@2148128300 @2148118340 ActiveObject::Facade::Active::Message#initialize facade=@2148130860 selector=:bar arguments=["Foo"]
  T@2148128300 @2148130860 ActiveObject::Facade::Active#_active_enqueue message=@2148118340 @queue.size=0
  T@2148237740 @2148130280 ActiveObject::Facade::Active#_active_start! @thread=@T2148119720
  T@2148237740 @2148116360 ActiveObject::Facade::Active::Message#initialize facade=@2148130280 selector=:bar arguments=["Bar"]
  T@2148237740 @2148130280 ActiveObject::Facade::Active#_active_enqueue message=@2148116360 @queue.size=0
  T@2148122080 @2148130860 ActiveObject::Facade::Active#_active_dequeue message=@2148118340 @queue.size=0
  T@2148122080 @2148118340 ActiveObject::Facade::Active::Message#invoke! @facade=@2148130860
T@2148122080 @2148132040 B#bar msg="Foo" @counter=1
  T@2148122080 @2148137960 ActiveObject::Facade::Distributor#method_missing foo ["Foo"]

MORE

Example with Active Distributor – Output – Page 4

  T@2148122080 @2148131440 ActiveObject::Facade::Active#method_missing foo ["Foo"]
  T@2148122080 @2148131440 ActiveObject::Facade::Active#_active_start! 
  T@2148110000 @2148131440 ActiveObject::Facade::Active#_active_start! Thread.new
  T@2148119720 @2148130280 ActiveObject::Facade::Active#_active_dequeue message=@2148116360 @queue.size=0
  T@2148119720 @2148116360 ActiveObject::Facade::Active::Message#invoke! @facade=@2148130280
T@2148119720 @2148132000 B#bar msg="Bar" @counter=1
  T@2148119720 @2148137960 ActiveObject::Facade::Distributor#method_missing foo ["Bar"]
  T@2148119720 @2148132020 ActiveObject::Facade::Active#method_missing foo ["Bar"]
  T@2148119720 @2148106260 ActiveObject::Facade::Active::Message#initialize facade=@2148132020 selector=:foo arguments=["Bar"]
  T@2148119720 @2148132020 ActiveObject::Facade::Active#_active_enqueue message=@2148106260 @queue.size=0
  T@2148122080 @2148131440 ActiveObject::Facade::Active#_active_start! @thread=@T2148110000
  T@2148122080 @2148103220 ActiveObject::Facade::Active::Message#initialize facade=@2148131440 selector=:foo arguments=["Foo"]
  T@2148122080 @2148131440 ActiveObject::Facade::Active#_active_enqueue message=@2148103220 @queue.size=0
  T@2148237740 @2148132020 ActiveObject::Facade::Active#join join thr=T@2148128300
  T@2148110000 @2148131440 ActiveObject::Facade::Active#_active_dequeue message=@2148103220 @queue.size=0

MORE

Example with Active Distributor – Output – Page 5

  T@2148110000 @2148103220 ActiveObject::Facade::Active::Message#invoke! @facade=@2148131440
T@2148110000 @2148133540 A#foo msg="Foo" @counter=1
  T@2148110000 @2148136160 ActiveObject::Facade::Distributor#method_missing bar ["Foo"]
  T@2148110000 @2148130860 ActiveObject::Facade::Active#method_missing bar ["Foo"]
  T@2148110000 @2148096380 ActiveObject::Facade::Active::Message#initialize facade=@2148130860 selector=:bar arguments=["Foo"]
  T@2148110000 @2148130860 ActiveObject::Facade::Active#_active_enqueue message=@2148096380 @queue.size=0
  T@2148128300 @2148132020 ActiveObject::Facade::Active#_active_dequeue message=@2148106260 @queue.size=0
  T@2148128300 @2148106260 ActiveObject::Facade::Active::Message#invoke! @facade=@2148132020
T@2148128300 @2148132060 A#foo msg="Bar" @counter=0
  T@2148128300 @2148132020 ActiveObject::Facade::Active#_active_stop! 
T@2148128300 @2148132000 B#bar result=[:a, 0] 
  T@2148128300 @2148132020 ActiveObject::Facade::Active#_active_start! stopped
  T@2148237740 @2148130280 ActiveObject::Facade::Active#join join thr=T@2148119720
T@2148110000 @2148132040 B#bar result=[:a, 0] 
T@2148122080 @2148132060 A#foo result=[:b, 0] 

MORE

Example with Active Distributor – Output – Page 6

  T@2148122080 @2148130860 ActiveObject::Facade::Active#_active_dequeue message=@2148096380 @queue.size=0
  T@2148122080 @2148096380 ActiveObject::Facade::Active::Message#invoke! @facade=@2148130860
T@2148122080 @2148132040 B#bar msg="Foo" @counter=0
  T@2148122080 @2148130860 ActiveObject::Facade::Active#_active_stop! 
T@2148122080 @2148133540 A#foo result=[:b, 0] 
  T@2148122080 @2148130860 ActiveObject::Facade::Active#_active_start! stopped
deadlock 0x100131d60: sleep:-  - ./active_object.rb:196
deadlock 0x100170358: sleep:J(0x100136950) (main) - ./active_object.rb:276
deadlock 0x100136950: sleep:-  - ./active_object.rb:196

Distributor

Distributor distributes work to other Facades via round-robin.

module ActiveObject
  class Facade
    class Distributor < Passive

      # Distributor Initialization
      def initialize target; ...; end 
      # Intercept and Distribute Messages
      def method_missing selector, *arguments, &block; ...; end 

      # Add Multiple Facades
      def _active_add_facade! cls, new_target = nil; ...; end 
  end
end

Distributor Initialization

module ActiveObject
  class Facade
    class Distributor < Passive
      def initialize target
        super
        @mutex = Mutex.new
        @target_list = [ ]
        @target_index = -1
      end
    end
  end
end

Intercept and Distribute Messages

module ActiveObject
  class Facade
    class Distributor < Passive
      def method_missing selector, *arguments, &block
        _log { "#{selector} #{arguments.inspect}" }
        if @target_list.empty?
          super
        else
          target = @mutex.synchronize do
            @target_list[@target_index = 
                         (@target_index + 1) % @target_list.size]
          end
          raise Error, "No target" unless target
          target.method_missing(selector, *arguments, &block)
        end
    end
  end
end

Add Multiple Facades

module ActiveObject
  class Facade
    class Distributor < Passive
      def _active_add_facade! cls, new_target = nil
        @mutex.synchronize do
          target = new_target || 
            (Proc === @target ? @target.call : @target.dup)
          @target_list << cls.new(target)
        end
    end
  end
end

Conclusion

  • Simple, easy-to-use API.
  • Does not require redesign of existing objects.
  • Supports asynchronous results.

Logging

module ActiveObject
  module Logging
    def _log_prefix; "  "; end
    def _log msg = nil
      msg ||= yield
      c = caller
      c = c[0]
      c = c =~ /`(.*)?'/ ? $1 : '<<unknown>>'
      namespace = Module === self ? "#{self.name}." : "#{self.class.name}#"
      $stderr.puts "#{_log_prefix}T@#{Thread.current.object_id} @#{object_id} #{namespace}#{c} #{msg}"
    end
  end
end