class LogStash::Inputs::Relp

Read RELP events over a TCP socket.

For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>

This protocol implements application-level acknowledgements to help protect against message loss.

Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/relp.rb, line 48
def initialize(*args)
  super(*args)
  @relp_server = nil

  # monkey patch TCPSocket and SSLSocket to include socket peer
  TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
  OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/relp.rb, line 154
def close
  # see related comment in register: we must make sure to close the @relp_server here
  # because it is created in the register method and we could be in the context of having
  # register called but never run & stop, only close.
  if @relp_server
    @relp_server.shutdown rescue nil
    @relp_server = nil
  end
end
register() click to toggle source
# File lib/logstash/inputs/relp.rb, line 58
def register
  @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}")

  if @ssl_enable
    initialize_ssl_context
    if @ssl_verify == false
      @logger.warn [
        "** WARNING ** Detected UNSAFE options in relp input configuration!",
        "** WARNING ** You have enabled encryption but DISABLED certificate verification.",
        "** WARNING ** To make sure your data is secure change :ssl_verify to true"
      ].join("\n")
    end
  end

  # note that since we are opening a socket (through @relp_server) in register,
  # we must also make sure we close it in the close method even if we also close
  # it in the stop method since we could have a situation where register is called
  # but not run & stop.

  @relp_server = RelpServer.new(@host, @port,['syslog'], @ssl_context)
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/relp.rb, line 122
def run(output_queue)
  while !stop?
    begin
      server, socket = @relp_server.accept
      connection_thread(server, socket, output_queue)
    rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
      @logger.warn('Relp client trying to open connection with something other than open:'+e.message)
    rescue Relp::InsufficientCommands
      @logger.warn('Relp client incapable of syslog')
    rescue Relp::ConnectionClosed
      @logger.debug('Relp Connection closed')
    rescue OpenSSL::SSL::SSLError => ssle
      # NOTE(mrichar1): This doesn't return a useful error message for some reason
      @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
    rescue => e
      # if this exception occured while the plugin is stopping
      # just ignore and exit
      raise e unless stop?
    end
  end
end
stop() click to toggle source
# File lib/logstash/inputs/relp.rb, line 144
def stop
  # force close @relp_server which will escape any blocking read with a IO exception
  # and any thread using them will exit.
  # catch all rescue nil to discard any close errors or invalid socket
  if @relp_server
    @relp_server.shutdown rescue nil
    @relp_server = nil
  end
end

Private Instance Methods

connection_thread(server, socket, output_queue) click to toggle source
# File lib/logstash/inputs/relp.rb, line 166
def connection_thread(server, socket, output_queue)
  Thread.start(server, socket, output_queue) do |server, socket, output_queue|
    begin
      server.relp_setup_connection(socket)
      peer = socket.peer
      @logger.debug("Relp Connection to #{peer} created")
      relp_stream(server,socket, output_queue, peer)
    rescue Relp::ConnectionClosed => e
      @logger.debug("Relp Connection to #{peer} Closed")
    rescue Relp::RelpError => e
      @logger.warn('Relp error: '+e.class.to_s+' '+e.message)
      #TODO: Still not happy with this, are they all warn level?
      #Will this catch everything I want it to?
      #Relp spec says to close connection on error, ensure this is the case
    rescue => e
      # ignore other exceptions
    ensure
      socket.close rescue nil
    end
  end
end
initialize_ssl_context() click to toggle source
# File lib/logstash/inputs/relp.rb, line 81
def initialize_ssl_context
  @ssl_context = OpenSSL::SSL::SSLContext.new
  @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
  @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase.value)
  if @ssl_verify
    @cert_store = OpenSSL::X509::Store.new
    # Load the system default certificate path to the store
    @cert_store.set_default_paths
    if !@ssl_cacert.nil?
      if File.directory?(@ssl_cacert)
        @cert_store.add_path(@ssl_cacert)
      else
        @cert_store.add_file(@ssl_cacert)
      end
    end
    @ssl_context.cert_store = @cert_store
    @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
  end
end
relp_stream(relpserver,socket,output_queue,client_address) click to toggle source
# File lib/logstash/inputs/relp.rb, line 102
def relp_stream(relpserver,socket,output_queue,client_address)
  while !stop?
    frame = relpserver.syslog_read(socket)
    @codec.decode(frame["message"]) do |event|
      decorate(event)
      event.set("host", client_address)
      if @ssl_enable && @ssl_verify && event.get("ssl_context").nil?
        event.set("sslsubject", socket.peer_cert.subject)
      end
      output_queue << event
    end

    #To get this far, the message must have made it into the queue for
    #filtering. I don't think it's possible to wait for output before ack
    #without fundamentally breaking the plugin architecture
    relpserver.ack(socket, frame['txnr'])
  end
end