class LogStash::Filters::Empow::Classification::BulkProcessor
Constants
- BATCH_TIMEOUT
- ERROR_TTL_SECS
- THREAD_IDLE_TIME
Public Class Methods
new(max_retries, batch_size, sec_between_attempts, requests_queue, online_classifer, local_classifier, max_concurrent_threads)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 150 def initialize(max_retries, batch_size, sec_between_attempts, requests_queue, online_classifer, local_classifier, max_concurrent_threads) @logger ||= self.logger @max_retries = max_retries @max_batch_size = batch_size @sec_between_attempts = sec_between_attempts @requests_queue = requests_queue @online_classifer = online_classifer @local_classifier = local_classifier @online_classification_workers = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: max_concurrent_threads, idletime: THREAD_IDLE_TIME) clear_batch(Time.now) end
Public Instance Methods
add_to_batch(request)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 172 def add_to_batch(request) # add the new request to the batch @current_batch_size = @current_batch_size + 1 @current_batch << request flush_current_batch end
close()
click to toggle source
# File lib/logstash/filters/classifier.rb, line 166 def close @online_classification_workers.kill() @online_classification_workers.wait_for_termination(10) end
flush_current_batch()
click to toggle source
# File lib/logstash/filters/classifier.rb, line 181 def flush_current_batch current_time = Time.now # check if the current batch is full or timed out if (@current_batch_size == @max_batch_size \ or (@current_batch_size > 0 and (current_time - @last_execution_time) > BATCH_TIMEOUT)) bulk_size = @current_batch_size batch = @current_batch @online_classification_workers.post do st = Time.now classify_online(batch) et = Time.now diff = (et - st) @logger.debug("response received", :bulk_size => bulk_size, :time => diff) end clear_batch(current_time) elsif @current_batch_size == 0 @last_execution_time = current_time end end
get_last_execution_time()
click to toggle source
# File lib/logstash/filters/classifier.rb, line 207 def get_last_execution_time return @last_execution_time end
retry_queued_requests()
click to toggle source
# File lib/logstash/filters/classifier.rb, line 219 def retry_queued_requests @logger.debug("retrying queued requests") current_time = Time.now batch_size = 0 batch = Array.new @requests_queue.each do |k, v| last_execution_time = v[:last_executed] if batch_size == @max_batch_size @online_classification_workers.post do classify_online(batch) end batch_size = 0 batch = Array.new end if last_execution_time + @sec_between_attempts > current_time next end batch << k v[:last_executed] = current_time v[:retries] = v[:retries] + 1 batch_size = batch_size + 1 end if batch_size > 0 @online_classification_workers.post do classify_online(batch) end end # remove requests that were in the queue for too long @requests_queue.delete_if {|key, value| value[:retries] >= @max_retries } end
Private Instance Methods
classify_online(bulk_request)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 261 def classify_online(bulk_request) results = nil current_time = Time.now batch = Array.new bulk_request.each do |req| task = @requests_queue[req] next if task.nil? # resolved by an earlier thread task[:last_executed] = current_time task[:retries] = task[:retries] + 1 batch << req end begin results = @online_classifer.classify(batch) rescue StandardError => e @logger.debug("bulk request ended with a failure, all requests will be removed from queue", :error => e, :backtrace => e.backtrace) batch.each do |req| @requests_queue.delete(request) end end if results.size != batch.size @logger.warn("response array isn't the same size as result array. requests: #{batch.size}. results: #{results.size}") return end results.each do |request, res| @logger.debug("processing response", :request => request, :response => res) begin expiration_time = Time.now + get_response_ttl(res) if res.is_successful # validate the response if needed # put the result in memory and in the local db @local_classifier.save_to_cache_and_db(request, res, expiration_time) else @local_classifier.add_to_cache(request, res, expiration_time) # log the failed result for tagging end rescue StandardError => e @logger.error("encountered an error while trying to process result", :request => request, :error => e, :backtrace => e.backtrace) end if res.is_final # in case of anti-malware, the result may change till the classification process is done @requests_queue.delete(request) end end end
clear_batch(current_time)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 212 def clear_batch(current_time) @current_batch = Array.new @current_batch_size = 0 @last_execution_time = current_time end
get_response_ttl(res)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 317 def get_response_ttl(res) return ERROR_TTL_SECS if !res.is_successful responseBody = res.response ttl = responseBody['ttlseconds'] if ttl.nil? or ttl < 0 ttl = 60 end return ttl end