class Threatinator::FeedRunner
Runs those feeds!
Has the following observations: :start - start of feed parsing :start_fetch - start of fetching :end_fetch - end of fetching :start_decode - start of decoding :end_decode - end of decoding :start_parse_record - start of record parse - record - The record :record_filtered - Indicates that the record was filtered. - record - The record :record_missed - Indicates that the record was not parsed - record - The record :record_parsed - Indicates that the record WAS parsed - record - The record - events - The events that were parsed out of the record :record_error - Indicates that the record WAS parsed - record - The record - errors - An array of exceptions that caused the error :end_parse_record - when a record has been parsed - record - The record :end - completion of feed parsing
Public Class Methods
new(feed, output_formatter, opts = {})
click to toggle source
@param [Threatinator::Feed] feed The feed that we want to run. @param [Threatinator::Output] output_formatter
# File lib/threatinator/feed_runner.rb, line 46 def initialize(feed, output_formatter, opts = {}) @feed = feed @output_formatter = output_formatter @feed_filters = @feed.filter_builders.map { |x| x.call } @decoders = @feed.decoder_builders.map { |x| x.call } @parser_block = @feed.parser_block @create_event_proc = self.method(:create_event).to_proc @event_builder = Threatinator::EventBuilder.new(@feed.provider, @feed.name) @total_events_built = 0 @event_errors = [] @built_events = [] end
run(feed, output, run_opts = {})
click to toggle source
Runs a feed @param [Threatinator::Feed] feed The feed to run @param [Threatinator::Output] output The output instance @param [Hash] run_opts Options passed to run
. See run
.
# File lib/threatinator/feed_runner.rb, line 172 def self.run(feed, output, run_opts = {}) self.new(feed, output).run(run_opts) end
Public Instance Methods
create_event() { |event_builder| ... }
click to toggle source
# File lib/threatinator/feed_runner.rb, line 122 def create_event @event_builder.reset @event_errors.clear yield(@event_builder) begin event = @event_builder.build @total_events_built += 1 @built_events << event rescue Threatinator::Exceptions::EventBuildError => e @event_errors << e end end
parse_record(record)
click to toggle source
# File lib/threatinator/feed_runner.rb, line 135 def parse_record(record) @built_events.clear events = [] changed(true); notify_observers(:start_parse_record, record) if @feed_filters.any? { |filter| filter.filter?(record) } changed(true); notify_observers(:record_filtered, record) return end @parser_block.call(@create_event_proc, record) if @event_errors.count > 0 changed(true); notify_observers(:record_error, record, @event_errors) position = "line: #{record.line_number}, start: #{record.pos_start}, end: #{record.pos_end}" messages = @event_errors.map { |e| e.to_s }.join(', ') logger.debug("Error generating event from record (#{position}): #{messages}") elsif @built_events.count == 0 changed(true); notify_observers(:record_missed, record) position = "line: #{record.line_number}, start: #{record.pos_start}, end: #{record.pos_end}" logger.debug("Expected event to be generated, but got none from record (#{position})") else @built_events.each do |event| events << event @output_formatter.handle_event(event) end changed(true); notify_observers(:record_parsed, record, events) end return ensure changed(true); notify_observers(:end_parse_record, record) end
run(opts = {})
click to toggle source
@param [Hash] opts The options hash @option opts [IO-like] :io Override the fetcher by providing
an IO directly.
@option opts [Boolean] :skip_decoding (false) Skip all decoding if set
to true. Useful for testing.
# File lib/threatinator/feed_runner.rb, line 66 def run(opts = {}) ios = [ ] logger.debug("#run starting #{@feed.provider}:#{@feed.name}") if logger.debug? start = Time.now changed(true); notify_observers(:start) skip_decoding = !!opts.delete(:skip_decoding) unless io = opts.delete(:io) fetcher = @feed.fetcher_builder.call() changed(true); notify_observers(:start_fetch) io = fetcher.fetch() changed(true); notify_observers(:end_fetch) else logger.debug('#run Skipping fetch. IO object was provided') end ios << io unless skip_decoding == true changed(true); notify_observers(:start_decode) @decoders.each do |decoder| new_io = decoder.decode(io) ios << new_io io = new_io end changed(true); notify_observers(:end_decode) end parser = @feed.parser_builder.call() parser.run(io) do |record| rr = parse_record(record) end changed(true); notify_observers(:end) logger.debug("#run finished #{@feed.provider}:#{@feed.name} in #{Time.now - start} seconds") if logger.debug? nil ensure # Close all IO objects that we've seen. while some_io = ios.pop unless some_io.closed? begin some_io.close rescue => e #:nocov: logger.warn("Failed to close IO: #{e} #{e.message}") #:nocov: end end end @output_formatter.finish end