class LogStash::Filters::EmpowClassifier

Constants

CACHE_TTL
CLASSIFICATION_URL

Public Instance Methods

filter(event) { |res| ... } click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 184
       def filter(event)
  res = event

  begin
    res = @plugin_core.classify(event)

    if res.nil?
      return
    end

    # event was classified and returned, not some overflow event
    if res.equal? event
      filter_matched(event)

      return
    end

    # got here with a parked event
    filter_matched(res)

    @logger.debug("filter matched for overflow event", :event => res)

    yield res

  rescue StandardError => e
    @logger.error("encountered an exception while classifying", :error => e, :event => event, :backtrace => e.backtrace)

    @tag_on_error.each{|tag| event.tag(tag)}
  end
end
flush(options = {}) click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 161
       def flush(options = {})
  @logger.debug("entered flush")

  events_to_flush = []

  begin
    parked_events = @plugin_core.flush(options)

    parked_events.each do |event|
      event.uncancel

      events_to_flush << event
    end

  rescue StandardError => e
    @logger.error("encountered an exception while processing flush", :error => e)
  end

  @logger.debug("flush ended", :flushed_event_count => events_to_flush.length)

  return events_to_flush
end
register() click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 110
def register
  @logger.info("registering empow classifcation plugin")

  validate_params()

  local_db = create_local_database

  local_classifier = LogStash::Filters::Empow::LocalClassifier.new(@cache_size, CACHE_TTL, @async_local_cache, local_db)

  base_url = get_effective_url()
  online_classifier = LogStash::Filters::Empow::ClassificationCenterClient.new(@username, @password, @authentication_hash, base_url)

  classifer = LogStash::Filters::Empow::Classifier.new(online_classifier, local_classifier, @max_classification_center_workers, @bulk_request_size, @bulk_request_interval, @max_query_retries, @time_between_queries)

  field_handler = LogStash::Filters::Empow::FieldHandler.new(@product_type_field, @product_name_field, @threat_field, @src_internal_field, @dst_internal_field)

  @plugin_core ||= LogStash::Filters::Empow::PluginLogic.new(classifer, field_handler, @pending_request_timeout, @max_pending_requests, @tag_on_timeout, @tag_on_error)

  @logger.info("empow classifcation plugin registered")
end

Private Instance Methods

close() click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 149
def close
  @logger.info("closing the empow classifcation plugin")

  @plugin_core.close

  @logger.info("empow classifcation plugin closed")
end
create_local_database() click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 215
        def create_local_database
  # if no elastic host has been configured, no local db should be used
  if @elastic_hosts.nil?
    @logger.info("no local persisted cache is configured")
    return nil
  end

  begin
    return LogStash::Filters::Empow::PersistentKeyValueDB.new(:elastic_hosts, :elastic_user, :elastic_password, :elastic_index)
  rescue StandardError => e
    @logger.error("caught an exception while trying to configured persisted cache", e)
  end

  return nil
end
get_effective_url() click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 132
def get_effective_url
  if (@base_url.nil? or @base_url.strip == 0)
    return CLASSIFICATION_URL
  end

  return CLASSIFICATION_URL
end
periodic_flush() click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 157
def periodic_flush
  true
end
validate_params() click to toggle source
# File lib/logstash/filters/empowclassifier.rb, line 141
def validate_params
  raise ArgumentError, 'threat field cannot be empty' if LogStash::Filters::Empow::Utils.is_blank_string(@threat_field)

  raise ArgumentError, 'bulk_request_size must be an positive number between 1 and 1000' if (@bulk_request_size < 1 or @bulk_request_size > 1000)

  raise ArgumentError, 'bulk_request_interval must be an greater or equal to 1' if (@bulk_request_interval < 1)
end