class LogStash::Filters::Empow::Classification::BulkProcessor

Constants

BATCH_TIMEOUT
ERROR_TTL_SECS
THREAD_IDLE_TIME

Public Class Methods

new(max_retries, batch_size, sec_between_attempts, requests_queue, online_classifer, local_classifier, max_concurrent_threads) click to toggle source
# File lib/logstash/filters/classifier.rb, line 150
def initialize(max_retries, batch_size, sec_between_attempts, requests_queue, online_classifer, local_classifier, max_concurrent_threads)
        @logger ||= self.logger

        @max_retries = max_retries
        @max_batch_size = batch_size
        @sec_between_attempts = sec_between_attempts
        @requests_queue = requests_queue
        @online_classifer = online_classifer
        @local_classifier = local_classifier

        @online_classification_workers = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: max_concurrent_threads, idletime: THREAD_IDLE_TIME)

        clear_batch(Time.now)
end

Public Instance Methods

add_to_batch(request) click to toggle source
# File lib/logstash/filters/classifier.rb, line 172
def add_to_batch(request)                            
        # add the new request to the batch
        @current_batch_size = @current_batch_size + 1
        @current_batch << request

        flush_current_batch
end
close() click to toggle source
# File lib/logstash/filters/classifier.rb, line 166
def close
        @online_classification_workers.kill()
        @online_classification_workers.wait_for_termination(10)
end
flush_current_batch() click to toggle source
# File lib/logstash/filters/classifier.rb, line 181
def flush_current_batch
        current_time = Time.now

        # check if the current batch is full or timed out
        if (@current_batch_size == @max_batch_size \
                or (@current_batch_size > 0 and (current_time - @last_execution_time) > BATCH_TIMEOUT))

                bulk_size = @current_batch_size
                batch = @current_batch

                @online_classification_workers.post do
                        st = Time.now
                        classify_online(batch)
                        et = Time.now
                        diff = (et - st)

                        @logger.debug("response received", :bulk_size => bulk_size, :time => diff)
                end
                
                clear_batch(current_time)
        elsif @current_batch_size == 0
                @last_execution_time = current_time
        end
end
get_last_execution_time() click to toggle source
# File lib/logstash/filters/classifier.rb, line 207
def get_last_execution_time
        return @last_execution_time
end
retry_queued_requests() click to toggle source
# File lib/logstash/filters/classifier.rb, line 219
def retry_queued_requests
        @logger.debug("retrying queued requests")

        current_time = Time.now
        batch_size = 0
        batch = Array.new

        @requests_queue.each do |k, v|
                last_execution_time = v[:last_executed]

                if batch_size == @max_batch_size
                        @online_classification_workers.post do
                                classify_online(batch)
                        end
                        
                        batch_size = 0
                        batch = Array.new
                end

                if last_execution_time + @sec_between_attempts > current_time
                        next
                end

                batch << k

                v[:last_executed] = current_time
                v[:retries] = v[:retries] + 1

                batch_size = batch_size + 1
        end

        if batch_size > 0
                @online_classification_workers.post do
                        classify_online(batch)
                end
        end

        # remove requests that were in the queue for too long
        @requests_queue.delete_if {|key, value| value[:retries] >= @max_retries }
end

Private Instance Methods

classify_online(bulk_request) click to toggle source
# File lib/logstash/filters/classifier.rb, line 261
def classify_online(bulk_request)

        results = nil
        current_time = Time.now

        batch = Array.new

        bulk_request.each do |req|
                task = @requests_queue[req]

                next if task.nil? # resolved by an earlier thread

                task[:last_executed] = current_time
                task[:retries] = task[:retries] + 1

                batch << req
        end

        begin
                results = @online_classifer.classify(batch)
        rescue StandardError => e
                @logger.debug("bulk request ended with a failure, all requests will be removed from queue", :error => e, :backtrace => e.backtrace)
                
                batch.each do |req|
                        @requests_queue.delete(request)
                end
        end

        if results.size != batch.size
                @logger.warn("response array isn't the same size as result array. requests: #{batch.size}. results: #{results.size}")
                return
        end

        results.each do |request, res|
                @logger.debug("processing response", :request => request, :response => res)
                
                begin
                        expiration_time = Time.now + get_response_ttl(res)

                        if res.is_successful
                                # validate the response if needed
                                # put the result in memory and in the local db
                                @local_classifier.save_to_cache_and_db(request, res, expiration_time)
                        else
                                @local_classifier.add_to_cache(request, res, expiration_time) # log the failed result for tagging
                        end
                rescue StandardError => e
                        @logger.error("encountered an error while trying to process result", :request => request, :error => e, :backtrace => e.backtrace)
                end

                if res.is_final # in case of anti-malware, the result may change till the classification process is done
                        @requests_queue.delete(request)
                end
        end
end
clear_batch(current_time) click to toggle source
# File lib/logstash/filters/classifier.rb, line 212
def clear_batch(current_time)
        @current_batch = Array.new
        @current_batch_size = 0
        @last_execution_time = current_time
end
get_response_ttl(res) click to toggle source
# File lib/logstash/filters/classifier.rb, line 317
        def get_response_ttl(res)
        return ERROR_TTL_SECS if !res.is_successful

        responseBody = res.response

        ttl = responseBody['ttlseconds']

        if ttl.nil? or ttl < 0
                ttl = 60
        end

        return ttl
end