module Log
    attr_accessor :_logger
    def self.included target
      super
      target.send(:extend, ClassMethods)
    end
    @@enabled = false
    def self.enabled
      @@enabled
    end
    def self.enabled= x
      @@enabled = x
    end
    module ClassMethods
      def _log_enabled= x
        (Thread.current[:'ASIR::Log.enabled'] ||= { })[self] = x
      end
      def _log_enabled?
        (Thread.current[:'ASIR::Log.enabled'] ||= { })[self]
      end
    end
    def _log_enabled= x
      @_log_enabled = x
    end
    def _log_enabled?
      ASIR::Log.enabled || 
        @_log_enabled || 
        self.class._log_enabled?
    end
    def _log msg = nil
      return unless _log_enabled?
      msg ||= yield if block_given?
      msg = String === msg ? msg : _log_format(msg)
      msg = "  #{$$} #{Module === self ? self : self.class} #{msg}"
      case @_logger
      when Proc
        @_logger.call msg
      when IO
        @_logger.puts msg
      else
        $stderr.puts msg
      end
      nil
    end
    def _log_result msg
      _log { 
        msg = String === msg ? msg : _log_format(msg);
        "#{msg} => ..." }
      result = yield
      _log { "#{msg} => \n    #{result.inspect}" }
      result
    end
    def _log_format obj
      case obj
      when Exception
        msg = "#{obj.inspect}"
        msg << "\n    #{obj.backtrace * "\n    "}" if false
        msg
      when Array
        obj.map { | x | _log_format x } * ", "
      else
        obj.inspect
      end
    end
  end # module
end # module
require 'asir'
require 'asir/environment'
require 'time'
module ASIR
class Main
  attr_accessor :env, :args, :exit_code
  # Delegate getter/setters to @env.
  [ :verb, :adjective, :object, :identifier,
    :config_rb,
    :verbose,
    :log_dir, :log_file,
    :pid_dir, :pid_file,
  ].
    map{|g| [ g, :"#{g}=" ]}.
    flatten.each do | m |
      define_method(m) { | *args | @env.send(m, *args) }
    end
  attr_accessor :progname
  # Options:
  attr_accessor :force
  # Transport selected from asir.phase = :transport.
  attr_accessor :transport
  def initialize
    self.env = ASIR::Environment.new
    self.progname = File.basename($0)
    self.exit_code = 0
  end
  def parse_args! args = ARGV.dup
    self.args = args
    until args.empty?
      case args.first
      when /^([a-z0-9_]+=)(.*)/i
        k, v = $1.to_sym, $2
        args.shift
        v = v.to_i if v == v.to_i.to_s
        send(k, v)
      else
        break
      end
    end
    self.verb, self.adjective, self.object, self.identifier = args.map{|x| x.to_sym}
    self.identifier ||= :'0'
    self
  end
  def config! *args
    @env.config! *args
  end
  def log_str
    "#{Time.now.gmtime.iso8601(4)} #{$$} #{log_str_no_time}"
  end
  def log_str_no_time
    "#{progname} #{verb} #{adjective} #{object} #{identifier}"
  end
  def run!
    unless verb && adjective && object
      self.exit_code = 1
      return usage!
    end
    config!(:configure)
    # $stderr.puts "log_file = #{log_file.inspect}"
    case self.verb
    when :restart
      self.verb = :stop
      _run_verb! && sleep(1)
      self.verb = :start
      _run_verb!
    else
      _run_verb!
    end
    self
  rescue ::Exception => exc
    $stderr.puts "#{log_str} ERROR\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}"
    self.exit_code += 1
    self
  end
  def _run_verb!
    sel = :"#{verb}_#{adjective}_#{object}!"
    if verbose >= 3
      $stderr.puts "  verb      = #{verb.inspect}"
      $stderr.puts "  adjective = #{adjective.inspect}"
      $stderr.puts "  object    = #{object.inspect}"
      $stderr.puts "  sel       = #{sel.inspect}"
    end
    send(sel)
  rescue ::Exception => exc
    $stderr.puts "#{log_str} ERROR\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}"
    self.exit_code += 1
    raise
    nil
  end
  def method_missing sel, *args
    log "method_missing #{sel}" if verbose >= 3
    case sel.to_s
    when /^start_([^_]+)_worker!$/
      _start_worker!
    when /^status_([^_]+)_([^_]+)!$/
      pid = server_pid
      puts "#{log_str} pid #{pid}"
      system("ps -fw -p #{pid}")
    when /^log_([^_]+)_([^_]+)!$/
      puts log_file
    when /^taillog_([^_]+)_([^_]+)!$/
      exec "tail -f #{log_file.inspect}"
    when /^pid_([^_]+)_([^_]+)!$/
      pid = server_pid rescue nil
      alive = process_running? pid
      puts "#{pid_file} #{pid || :NA} #{alive}"
    when /^alive_([^_]+)_([^_]+)!$/
      pid = server_pid rescue nil
      alive = process_running? pid
      puts "#{pid_file} #{pid || :NA} #{alive}" if @verbose
      self.exit_code += 1 unless alive
    when /^stop_([^_]+)_([^_]+)!$/
      kill_server!
    else
      super
    end
  end
  def usage!
    $stderr.puts <<"END"
SYNOPSIS:
  asir [ <<options>> ... ] <<verb>> <<adjective>> <<object>> [ <<identifier>> ]
OPTIONS:
  config_rb=file.rb ($ASIR_LOG_DIR)
  pid_dir=dir/      ($ASIR_PID_DIR)
  log_dir=dir/      ($ASIR_LOG_DIR)
  verbose=[0-9]
VERBS:
  start
  stop
  restart
  status
  log
  pid
  alive
ADJECTIVE-OBJECTs:
  beanstalk conduit
  beanstalk worker
  zmq worker
  webrick worker
  resque conduit
  resque worker
EXAMPLES:
  export ASIR_CONFIG_RB="some_system/asir_config.rb"
  asir start beanstalk conduit
  asir status beanstalk conduit
  asir start webrick worker
  asir pid   webrick worker
  asir start beanstalk worker 1
  asir start beanstalk worker 2
  asir start zmq worker
  asir start zmq worker 1
  asir start zmq worker 2
END
  end
  def start_beanstalk_conduit!
    _start_conduit!
  end
  def start_resque_conduit!
    _start_conduit!
  end
  def _start_conduit!
    config!(:environment)
    self.transport = config!(:transport)
    fork_server! do
      transport.start_conduit! :fork => false
    end
  end
  def _start_worker! type = adjective
    log "start_worker! #{type}"
    type = type.to_s
    fork_server! do
      transport_file = "asir/transport/#{type}"
      log "loading #{transport_file}"
      require transport_file
      _create_transport ASIR::Transport.const_get(type[0..0].upcase + type[1..-1])
      _run_workers!
    end
  end
  def fork_server! cmd = nil, &blk
    pid = Process.fork do
      run_server! cmd, &blk
    end
    log "forked pid #{pid}"
    Process.detach(pid) # Forks a Thread?  We are gonna exit anyway.
    File.open(pid_file, "w+") { | o | o.puts pid }
    File.chmod(0666, pid_file) rescue nil
    # Wait and check if process still exists.
    sleep 3
    unless process_running? pid
      raise "Server process #{pid} died to soon?"
    end
    self
  end
  def run_server! cmd = nil
    lf = File.open(log_file, "a+")
    File.chmod(0666, log_file) rescue nil
    $stdin.close rescue nil
    STDIN.close rescue nil
    STDOUT.reopen(lf)
    $stdout.reopen(lf) if $stdout.object_id != STDOUT.object_id
    STDERR.reopen(lf)
    $stderr.reopen(lf) if $stderr.object_id != STDERR.object_id
    # Process.daemon rescue nil # Ruby 1.9.x only.
    lf.puts "#{log_str} starting pid #{$$}"
    begin
      if cmd
        exec(cmd)
      else
        yield
      end
    ensure
      lf.puts "#{log_str} finished pid #{$$}"
      File.unlink(pid_file) rescue nil
    end
    self
  rescue ::Exception => exc
    msg = "ERROR pid #{$$}\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}"
    log msg, :stderr
    raise
    self
  end
  def kill_server!
    log "#{log_str} kill"
    pid = server_pid
    stop_pid! pid
  rescue ::Exception => exc
    log "#{log_str} ERROR\n#{exc.inspect}\n  #{exc.backtrace * "\n  "}", :stderr
    raise
  end
  def log msg, to_stderr = false
    if to_stderr
      $stderr.puts "#{log_str_no_time} #{msg}"
    end
    File.open(log_file, "a+") do | log |
      log.puts "#{log_str} #{msg}"
    end
  end
  def server_pid
    pid = File.read(pid_file).chomp!
    pid.to_i
  end
  def _create_transport default_class
    config!(:environment)
    case transport = config!(:transport)
    when default_class
      self.transport = transport
    else
      raise "Expected config to return a #{default_class}, not a #{transport.class}"
    end
  end
  def worker_pids
    (@worker_pids ||= { })[adjective] ||= { }
  end
  def _run_workers!
    $0 = "#{progname} #{adjective} #{object} #{identifier}"
    worker_id = 0
    transport.prepare_server!
    worker_processes = transport[:worker_processes] || 1
    (worker_processes - 1).times do
      wid = worker_id += 1
      pid = Process.fork do
        _run_transport_server! wid
      end
      Process.setgprp(pid, 0) rescue nil
      worker_pids[wid] = pid
      log "forked #{wid} pid #{pid}"
    end
    _run_transport_server!
  ensure
    log "worker 0 stopped"
    _stop_workers!
  end
  def _run_transport_server! wid = 0
    log "running transport worker #{transport.class} #{wid}"
    config!(:start)
    $0 += " #{wid} #{transport.uri rescue nil}"
    old_arg0 = $0.dup
    after_receive_message = transport.after_receive_message || lambda { | transport, message | nil }
    transport.after_receive_message = lambda do | transport, message |
      $0 = "#{old_arg0} #{transport.message_count} #{message.identifier}"
      after_receive_message.call(transport, message)
    end
    transport.run_server!
    self
  end
  def _stop_workers!
    workers = worker_pids.dup
    worker_pids.clear
    workers.each do | wid, pid |
      config!(:stop)
      stop_pid! pid, "wid #{wid} "
    end
    workers.each do | wid, pid |
      wr = Process.waitpid(pid) rescue nil
      log "stopped #{wid} pid #{pid} => #{wr.inspect}", :stderr
    end
  ensure
    worker_pids.clear
  end
  def stop_pid! pid, msg = nil
    log "stopping #{msg}pid #{pid}", :stderr
    if process_running? pid
      log "TERM pid #{pid}"
      Process.kill('TERM', pid) rescue nil
      sleep 3
      if force or process_running? pid
        log "KILL pid #{pid}", :stderr
        Process.kill('KILL', pid) rescue nil
      end
      if process_running? pid
        log "cant-stop pid #{pid}", :stderr
      end
    else
      log "not-running? pid #{pid}", :stderr
    end
  end
  def process_running? pid
    case pid
    when false, nil
      pid
    when Integer
      Process.kill(0, pid)
    else
      raise TypeError, "expected false, nil, Integer; given #{pid.inspect}"
    end
    true
  rescue ::Errno::ESRCH
    false
  rescue ::Exception => exc
    $stderr.puts "  DEBUG: process_running? #{pid} => #{exc.inspect}"
    false
  end
end # class
end # module
module ASIR
  class Message
    module Delay
      # Returns the number of seconds from now, that the message should be delayed.
      # If message.delay is Numeric, sets message.delay to the Time to delay til.
      # If message.delay is Time, returns (now - message.delay).to_f
      # Returns Float if message.delay was set, or nil.
      # Returns 0 if delay has already expired.
      def relative_message_delay! message, now = nil
        case delay = message.delay
        when nil
        when Numeric
          now ||= Time.now
          delay = delay.to_f
          message.delay = (now + delay).utc
        when Time
          now ||= Time.now
          delay = (delay - now).to_f
          delay = 0 if delay < 0
        else
          raise TypeError, "Expected message.delay to be Numeric or Time, given #{delay.class}"
        end
        delay
      end
      def wait_for_delay! message
        while (delay = relative_message_delay!(message)) && delay > 0 
          sleep delay
        end
        self
      end
    end
  end
end
module ASIR