class LogStash::Inputs::Relp
Read RELP events over a TCP socket.
For more information about RELP, see <www.rsyslog.com/doc/imrelp.html>
This protocol implements application-level acknowledgements to help protect against message loss.
Message acks only function as far as messages being put into the queue for filters; anything lost after that point will not be retransmitted
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/relp.rb, line 48 def initialize(*args) super(*args) @relp_server = nil # monkey patch TCPSocket and SSLSocket to include socket peer TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer} end
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/relp.rb, line 154 def close # see related comment in register: we must make sure to close the @relp_server here # because it is created in the register method and we could be in the context of having # register called but never run & stop, only close. if @relp_server @relp_server.shutdown rescue nil @relp_server = nil end end
register()
click to toggle source
# File lib/logstash/inputs/relp.rb, line 58 def register @logger.info("Starting relp input listener", :address => "#{@host}:#{@port}") if @ssl_enable initialize_ssl_context if @ssl_verify == false @logger.warn [ "** WARNING ** Detected UNSAFE options in relp input configuration!", "** WARNING ** You have enabled encryption but DISABLED certificate verification.", "** WARNING ** To make sure your data is secure change :ssl_verify to true" ].join("\n") end end # note that since we are opening a socket (through @relp_server) in register, # we must also make sure we close it in the close method even if we also close # it in the stop method since we could have a situation where register is called # but not run & stop. @relp_server = RelpServer.new(@host, @port,['syslog'], @ssl_context) end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/relp.rb, line 122 def run(output_queue) while !stop? begin server, socket = @relp_server.accept connection_thread(server, socket, output_queue) rescue Relp::InvalidCommand,Relp::InappropriateCommand => e @logger.warn('Relp client trying to open connection with something other than open:'+e.message) rescue Relp::InsufficientCommands @logger.warn('Relp client incapable of syslog') rescue Relp::ConnectionClosed @logger.debug('Relp Connection closed') rescue OpenSSL::SSL::SSLError => ssle # NOTE(mrichar1): This doesn't return a useful error message for some reason @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) rescue => e # if this exception occured while the plugin is stopping # just ignore and exit raise e unless stop? end end end
stop()
click to toggle source
# File lib/logstash/inputs/relp.rb, line 144 def stop # force close @relp_server which will escape any blocking read with a IO exception # and any thread using them will exit. # catch all rescue nil to discard any close errors or invalid socket if @relp_server @relp_server.shutdown rescue nil @relp_server = nil end end
Private Instance Methods
connection_thread(server, socket, output_queue)
click to toggle source
# File lib/logstash/inputs/relp.rb, line 166 def connection_thread(server, socket, output_queue) Thread.start(server, socket, output_queue) do |server, socket, output_queue| begin server.relp_setup_connection(socket) peer = socket.peer @logger.debug("Relp Connection to #{peer} created") relp_stream(server,socket, output_queue, peer) rescue Relp::ConnectionClosed => e @logger.debug("Relp Connection to #{peer} Closed") rescue Relp::RelpError => e @logger.warn('Relp error: '+e.class.to_s+' '+e.message) #TODO: Still not happy with this, are they all warn level? #Will this catch everything I want it to? #Relp spec says to close connection on error, ensure this is the case rescue => e # ignore other exceptions ensure socket.close rescue nil end end end
initialize_ssl_context()
click to toggle source
# File lib/logstash/inputs/relp.rb, line 81 def initialize_ssl_context @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase.value) if @ssl_verify @cert_store = OpenSSL::X509::Store.new # Load the system default certificate path to the store @cert_store.set_default_paths if !@ssl_cacert.nil? if File.directory?(@ssl_cacert) @cert_store.add_path(@ssl_cacert) else @cert_store.add_file(@ssl_cacert) end end @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end end
relp_stream(relpserver,socket,output_queue,client_address)
click to toggle source
# File lib/logstash/inputs/relp.rb, line 102 def relp_stream(relpserver,socket,output_queue,client_address) while !stop? frame = relpserver.syslog_read(socket) @codec.decode(frame["message"]) do |event| decorate(event) event.set("host", client_address) if @ssl_enable && @ssl_verify && event.get("ssl_context").nil? event.set("sslsubject", socket.peer_cert.subject) end output_queue << event end #To get this far, the message must have made it into the queue for #filtering. I don't think it's possible to wait for output before ack #without fundamentally breaking the plugin architecture relpserver.ack(socket, frame['txnr']) end end