class RelpServer

Public Class Methods

new(host,port,required_commands=[],ssl_context=nil) click to toggle source
# File lib/logstash/util/relp.rb, line 104
def initialize(host,port,required_commands=[],ssl_context=nil)
  @logger = ::Cabin::Channel.get(LogStash)

  @server=true

  #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.)
  @basic_relp_commands = ['close']#TODO: check for others

  #These are extra commands that we require, otherwise refuse the connection
  @required_relp_commands = required_commands

  begin
    @server = TCPServer.new(host, port)
  rescue Errno::EADDRINUSE
    @logger.error("Could not start RELP server: Address in use",
                  :host => host, :port => port)
    raise
  end
  if ssl_context
    @server = OpenSSL::SSL::SSLServer.new(@server, ssl_context)
  end
  @logger.info("Started #{ssl_context ? 'SSL-enabled ' : ''}RELP Server", :host => host, :port => port)
end

Public Instance Methods

accept() click to toggle source
# File lib/logstash/util/relp.rb, line 128
def accept
  socket = @server.accept
  @logger.debug("New socket created")
  return self, socket
end
ack(socket, txnr) click to toggle source
# File lib/logstash/util/relp.rb, line 210
def ack(socket, txnr)
  frame = Hash.new
  frame['txnr'] = txnr
  frame['command'] = 'rsp'
  frame['message'] = '200 OK'
  self.frame_write(socket, frame)
end
relp_setup_connection(socket) click to toggle source
# File lib/logstash/util/relp.rb, line 134
def relp_setup_connection(socket)
  frame=self.frame_read(socket)
  if frame['command'] == 'open'
    offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten]
    if offer['relp_version'].nil?
      @logger.warn("No relp version specified")
      #if no version specified, relp spec says we must close connection
      self.serverclose(socket)
      raise RelpError, 'No relp_version specified'
    #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer
    elsif ! (@required_relp_commands - offer['commands'].split(',')).empty?
      @logger.warn("Not all required commands are available", :required => @required_relp_commands, :offer => offer['commands'])
      #Tell them why we're closing the connection:
      response_frame = Hash.new
      response_frame['txnr'] = frame['txnr']
      response_frame['command'] = 'rsp'
      response_frame['message'] = '500 Required command(s) '
          + (@required_relp_commands - offer['commands'].split(',')).join(',')
          + ' not offered'
      self.frame_write(socket,response_frame)
      self.serverclose(socket)
      raise InsufficientCommands, offer['commands']
          + ' offered, require ' + @required_relp_commands.join(',')
    else
      #attempt to set up connection
      response_frame = Hash.new
      response_frame['txnr'] = frame['txnr']
      response_frame['command'] = 'rsp'

      response_frame['message'] = '200 OK '
      response_frame['message'] += 'relp_version=' + RelpVersion + "\n"
      response_frame['message'] += 'relp_software=' + RelpSoftware + "\n"
      response_frame['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: optional ones
      self.frame_write(socket, response_frame)
    end
  else
    self.serverclose(socket)
    raise InappropriateCommand, frame['command'] + ' expecting open'
  end
end
serverclose(socket) click to toggle source
# File lib/logstash/util/relp.rb, line 195
def serverclose(socket)
  frame = Hash.new
  frame['txnr'] = 0
  frame['command'] = 'serverclose'
  begin
    self.frame_write(socket,frame)
    socket.close rescue nil
  rescue ConnectionClosed
  end
end
shutdown() click to toggle source
# File lib/logstash/util/relp.rb, line 206
def shutdown
  @server.close rescue nil
end
syslog_read(socket) click to toggle source

This does not ack the frame, just reads it

# File lib/logstash/util/relp.rb, line 176
def syslog_read(socket)
  frame = self.frame_read(socket)
  if frame['command'] == 'syslog'
    return frame
  elsif frame['command'] == 'close'
    #the client is closing the connection, acknowledge the close and act on it
    response_frame = Hash.new
    response_frame['txnr'] = frame['txnr']
    response_frame['command'] = 'rsp'
    self.frame_write(socket,response_frame)
    self.serverclose(socket)
    raise ConnectionClosed
  else
    #the client is trying to do something unexpected
    self.serverclose(socket)
    raise InappropriateCommand, frame['command'] + ' expecting syslog'
  end
end