class LogStash::Filters::Empow::PluginLogic
Public Class Methods
new(classifer, field_handler, max_parking_time, max_parked_events, tag_on_timeout, tag_on_error)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 12 def initialize(classifer, field_handler, max_parking_time, max_parked_events, tag_on_timeout, tag_on_error) @logger ||= self.logger #@logger.info("initializing classifier") @field_handler = field_handler @max_parking_time = max_parking_time @max_parked_events = max_parked_events @tag_on_timeout = tag_on_timeout @tag_on_error = tag_on_error @classifer = classifer @parked_events = Concurrent::Array.new end
Public Instance Methods
classify(event)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 31 def classify(event) request = @field_handler.event_to_classification_request(event) if request.nil? @tag_on_error.each{|tag| event.tag(tag)} return event end if classify_event(request, event) return event else park(event) if @parked_events.length > @max_parked_events tuple = @parked_events.shift if !tuple.nil? unparked_event = tuple[:event] unparked_event.uncancel return unparked_event end end return nil end end
close()
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 27 def close @classifer.close end
flush(options = {})
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 58 def flush(options = {}) # tag flushed events, events_to_flush = [] if options[:final] # indicating "final flush" special event, flush everything while tuple = @parked_events.shift do events_to_flush << tuple[:event] end else @parked_events.delete_if do |tuple| process_parked_event(tuple, events_to_flush) end end return events_to_flush end
Private Instance Methods
classify_event(request, event)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 108 def classify_event(request, event) res = @classifer.classify(request) if is_valid_classification(res) tag_event(res, event) return true end return false end
is_valid_classification(classification)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 120 def is_valid_classification(classification) return (!classification.nil? and classification.is_final()) end
park(event)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 150 def park(event) tuple = {} tuple[:event] = event tuple[:time] = Time.now @parked_events << tuple event.cancel # don't stream this event just yet ... end
parking_time_expired(tuple)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 161 def parking_time_expired(tuple) return (Time.now - tuple[:time]) > @max_parking_time end
process_parked_event(tuple, events_to_flush)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 75 def process_parked_event(tuple, events_to_flush) event = tuple[:event] request = @field_handler.event_to_classification_request(event) begin res = @classifer.classify(request) if (parking_time_expired(tuple) or is_valid_classification(res)) tag_event(res, event) # if we're releasing this event based on time expiration, tag it with timeout if res.nil? or !res.is_final @tag_on_timeout.each{|tag| event.tag(tag)} end events_to_flush << event return true end rescue StandardError => e @logger.error("an error occured while processing event, event flushed backed to the stream", :request => request, :backtrace => e.backtrace) return true # so that this event will be flushed out of the plugin end return false end
tag_event(classification, event)
click to toggle source
# File lib/logstash/filters/plugin-logic.rb, line 125 def tag_event(classification, event) return if classification.nil? responseBody = classification.response @logger.debug("classification response", :classification => responseBody) response = responseBody["response"] if !response.nil? && response.size > 0 response.each do |k, v| event.set("[empow_classification_response][#{k}]", v) end end if !classification.is_successful() @tag_on_error.each{|tag| event.tag(tag)} if (!responseBody.nil?) LogStash::Filters::Empow::Utils.add_error(event, responseBody) end end end