class LogStash::Filters::Empow::Classifier

Constants

BATCH_TIMEOUT
MAX_CONCURRENT_REQUESTS

Public Class Methods

new(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries) click to toggle source
# File lib/logstash/filters/classifier.rb, line 16
def initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries)
        @logger ||= self.logger

        @logger.info("initializing classifier")

        @local_classifier = local_classifier
        @online_classifer = online_classifer
        @batch_interval = batch_interval
        @time_between_queries = time_between_queries

        @inflight_requests = Concurrent::Hash.new
        @new_request_queue = java.util.concurrent.ArrayBlockingQueue.new(MAX_CONCURRENT_REQUESTS)

        @bulk_processor = Classification::BulkProcessor.new(max_retries, batch_size, time_between_queries, @inflight_requests, online_classifer, local_classifier, online_classification_workers)

        @worker_pool = Concurrent::FixedThreadPool.new(1)

        @worker_pool.post do
                while @worker_pool.running? do
                        begin
                                management_task()
                        rescue StandardError => e
                                @logger.error("encountered an error while running the management task", :error => e, :backtrace => e.backtrace)
                        end
                end
        end
        @logger.debug("classifier initialized")

        @last_action_time = Time.now
end

Public Instance Methods

classify(request) click to toggle source
# File lib/logstash/filters/classifier.rb, line 97
def classify(request)
        return nil if request.nil?
        
        res = @local_classifier.classify(request)
        
        @logger.trace("cached result", :request => request, :res => res)
        
        return res if !res.nil?

        request_online_classifiction(request)

        return nil
end
close() click to toggle source
# File lib/logstash/filters/classifier.rb, line 48
def close
        @logger.info("shutting down empow's classifcation plugin")

        @inflight_requests.clear()

        @bulk_processor.close

        @worker_pool.kill()
        @worker_pool.wait_for_termination(5)

        @logger.info("empow classifcation plugin closed")
end

Private Instance Methods

create_task(request) click to toggle source
# File lib/logstash/filters/classifier.rb, line 130
def create_task(request)
        tuple = {}
        tuple[:retries] = 0
        tuple[:request] = request
        tuple[:last_executed] = Time.at(310953600)

        return tuple
end
management_task() click to toggle source
# File lib/logstash/filters/classifier.rb, line 62
def management_task
        begin
                current_time = Time.now

                diff = (current_time - @bulk_processor.get_last_execution_time()).round

                sleep_time = @batch_interval - diff

                sleep_time = 0 if sleep_time < 0 # in case the rounding caused the number to be smaller than zero

                dequeued_request = nil
                begin
                        dequeued_request = @new_request_queue.poll(sleep_time, TimeUnit::SECONDS)
                rescue java.lang.InterruptedException => e
                end

                # if this is a 'tick'
                if dequeued_request.nil?
                        @bulk_processor.flush_current_batch
                else
                        @bulk_processor.add_to_batch(dequeued_request)
                end

                # skip the 'tick' if the timer hasn't expired
                return if current_time - @last_action_time < @time_between_queries

                @last_action_time = current_time
                                
                @bulk_processor.retry_queued_requests()
        rescue StandardError => e
                @logger.error("encountered an error while running the management task", :error => e, :backtrace => e.backtrace)
        end
end
request_online_classifiction(req) click to toggle source
# File lib/logstash/filters/classifier.rb, line 112
def request_online_classifiction(req)
        existing_request = @inflight_requests[req]
        
        return if !existing_request.nil? # request already handled by a worker

        @logger.debug("adding request to online classification queue", :request => req)

        task = create_task(req)

        # mark request as in progress
        @inflight_requests[req] = task

        res = @new_request_queue.offer(req)

        @logger.warn("queue full, request reject", :request => req) if !res
end