class RelpServer
Public Class Methods
new(host,port,required_commands=[],ssl_context=nil)
click to toggle source
# File lib/logstash/util/relp.rb, line 104 def initialize(host,port,required_commands=[],ssl_context=nil) @logger = ::Cabin::Channel.get(LogStash) @server=true #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) @basic_relp_commands = ['close']#TODO: check for others #These are extra commands that we require, otherwise refuse the connection @required_relp_commands = required_commands begin @server = TCPServer.new(host, port) rescue Errno::EADDRINUSE @logger.error("Could not start RELP server: Address in use", :host => host, :port => port) raise end if ssl_context @server = OpenSSL::SSL::SSLServer.new(@server, ssl_context) end @logger.info("Started #{ssl_context ? 'SSL-enabled ' : ''}RELP Server", :host => host, :port => port) end
Public Instance Methods
accept()
click to toggle source
# File lib/logstash/util/relp.rb, line 128 def accept socket = @server.accept @logger.debug("New socket created") return self, socket end
ack(socket, txnr)
click to toggle source
# File lib/logstash/util/relp.rb, line 210 def ack(socket, txnr) frame = Hash.new frame['txnr'] = txnr frame['command'] = 'rsp' frame['message'] = '200 OK' self.frame_write(socket, frame) end
relp_setup_connection(socket)
click to toggle source
# File lib/logstash/util/relp.rb, line 134 def relp_setup_connection(socket) frame=self.frame_read(socket) if frame['command'] == 'open' offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten] if offer['relp_version'].nil? @logger.warn("No relp version specified") #if no version specified, relp spec says we must close connection self.serverclose(socket) raise RelpError, 'No relp_version specified' #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer elsif ! (@required_relp_commands - offer['commands'].split(',')).empty? @logger.warn("Not all required commands are available", :required => @required_relp_commands, :offer => offer['commands']) #Tell them why we're closing the connection: response_frame = Hash.new response_frame['txnr'] = frame['txnr'] response_frame['command'] = 'rsp' response_frame['message'] = '500 Required command(s) ' + (@required_relp_commands - offer['commands'].split(',')).join(',') + ' not offered' self.frame_write(socket,response_frame) self.serverclose(socket) raise InsufficientCommands, offer['commands'] + ' offered, require ' + @required_relp_commands.join(',') else #attempt to set up connection response_frame = Hash.new response_frame['txnr'] = frame['txnr'] response_frame['command'] = 'rsp' response_frame['message'] = '200 OK ' response_frame['message'] += 'relp_version=' + RelpVersion + "\n" response_frame['message'] += 'relp_software=' + RelpSoftware + "\n" response_frame['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: optional ones self.frame_write(socket, response_frame) end else self.serverclose(socket) raise InappropriateCommand, frame['command'] + ' expecting open' end end
serverclose(socket)
click to toggle source
# File lib/logstash/util/relp.rb, line 195 def serverclose(socket) frame = Hash.new frame['txnr'] = 0 frame['command'] = 'serverclose' begin self.frame_write(socket,frame) socket.close rescue nil rescue ConnectionClosed end end
shutdown()
click to toggle source
# File lib/logstash/util/relp.rb, line 206 def shutdown @server.close rescue nil end
syslog_read(socket)
click to toggle source
This does not ack the frame, just reads it
# File lib/logstash/util/relp.rb, line 176 def syslog_read(socket) frame = self.frame_read(socket) if frame['command'] == 'syslog' return frame elsif frame['command'] == 'close' #the client is closing the connection, acknowledge the close and act on it response_frame = Hash.new response_frame['txnr'] = frame['txnr'] response_frame['command'] = 'rsp' self.frame_write(socket,response_frame) self.serverclose(socket) raise ConnectionClosed else #the client is trying to do something unexpected self.serverclose(socket) raise InappropriateCommand, frame['command'] + ' expecting syslog' end end