class DBus::MessageQueue

Encapsulates a socket so that we can {#push} and {#pop} {Message}s.

Constants

MSG_BUF_SIZE

The buffer size for messages.

Attributes

socket[R]

The socket that is used to connect with the bus.

Public Class Methods

new(address) click to toggle source
# File lib/dbus/message_queue.rb, line 24
def initialize(address)
  DBus.logger.debug "MessageQueue: #{address}"
  @address = address
  @buffer = ""
  # Reduce allocations by using a single buffer for our socket
  @read_buffer = String.new(capacity: MSG_BUF_SIZE)
  @is_tcp = false
  @mutex = Mutex.new
  connect
end

Public Instance Methods

<<(message)
Alias for: push
pop(blocking: true) click to toggle source

@param blocking [Boolean]

true:  wait to return a {Message};
false: may return `nil`

@return [Message,nil] one message or nil if unavailable @raise EOFError @todo failure modes

# File lib/dbus/message_queue.rb, line 41
def pop(blocking: true)
  # FIXME: this is not enough, the R/W test deadlocks on shared connections
  @mutex.synchronize do
    buffer_from_socket_nonblock
    message = message_from_buffer_nonblock
    if blocking
      # we can block
      while message.nil?
        r, _d, _d = IO.select([@socket])
        if r && r[0] == @socket
          buffer_from_socket_nonblock
          message = message_from_buffer_nonblock
        end
      end
    end
    message
  end
end
push(message) click to toggle source
# File lib/dbus/message_queue.rb, line 60
def push(message)
  @mutex.synchronize do
    @socket.write(message.marshall)
  end
end
Also aliased as: <<

Private Instance Methods

connect() click to toggle source

Connect to the bus and initialize the connection.

# File lib/dbus/message_queue.rb, line 70
def connect
  addresses = @address.split ";"
  # connect to first one that succeeds
  addresses.find do |a|
    transport, keyvaluestring = a.split ":"
    kv_list = keyvaluestring.split ","
    kv_hash = {}
    kv_list.each do |kv|
      key, escaped_value = kv.split "="
      value = escaped_value.gsub(/%(..)/) { |_m| [Regexp.last_match(1)].pack "H2" }
      kv_hash[key] = value
    end
    case transport
    when "unix"
      connect_to_unix kv_hash
    when "tcp"
      connect_to_tcp kv_hash
    when "launchd"
      connect_to_launchd kv_hash
    else
      # ignore, report?
    end
  end
  # returns the address that worked or nil.
  # how to report failure?
end
connect_to_launchd(params) click to toggle source
# File lib/dbus/message_queue.rb, line 136
def connect_to_launchd(params)
  socket_var = params["env"]
  socket = `launchctl getenv #{socket_var}`.chomp
  connect_to_unix "path" => socket
end
connect_to_tcp(params) click to toggle source

Connect to a bus over tcp and initialize the connection.

# File lib/dbus/message_queue.rb, line 98
def connect_to_tcp(params)
  host = params["host"]
  port = params["port"]
  if host && port
    begin
      # initialize the tcp socket
      @socket = TCPSocket.new(host, port.to_i)
      @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
      init_connection
      @is_tcp = true
    rescue Exception => e
      puts "Oops:", e
      puts "Error: Could not establish connection to: #{host}:#{port}, will now exit."
      exit(1) # a little harsh
    end
  else
    # Danger, Will Robinson: the specified "path" is not usable
    puts "Error: supplied params: #{@params}, unusable! sorry."
  end
end
connect_to_unix(params) click to toggle source

Connect to an abstract unix bus and initialize the connection.

# File lib/dbus/message_queue.rb, line 120
def connect_to_unix(params)
  @socket = Socket.new(Socket::Constants::PF_UNIX, Socket::Constants::SOCK_STREAM, 0)
  @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
  if !params["abstract"].nil?
    sockaddr = if HOST_END == LIL_END
                 "\1\0\0#{params["abstract"]}"
               else
                 "\0\1\0#{params["abstract"]}"
               end
  elsif !params["path"].nil?
    sockaddr = Socket.pack_sockaddr_un(params["path"])
  end
  @socket.connect(sockaddr)
  init_connection
end
init_connection() click to toggle source

Initialize the connection to the bus.

# File lib/dbus/message_queue.rb, line 143
def init_connection
  client = Authentication::Client.new(@socket)
  client.authenticate
end