class Fluent::Plugin::SamplingFilterOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_sampling_filter.rb, line 15 def configure(conf) super log.warn "sampling_filter output plugin is deprecated. use sampling_filter filter plugin instead with <label> routing." if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length elsif @add_prefix.empty? raise Fluent::ConfigError, "either of 'add_prefix' or 'remove_prefix' must be specified" end @added_prefix_string = nil @added_prefix_string = @add_prefix + '.' unless @add_prefix.empty? @counts = {} @resets = {} if @minimum_rate_per_min end
emit_sampled(tag, time_record_pairs)
click to toggle source
# File lib/fluent/plugin/out_sampling_filter.rb, line 33 def emit_sampled(tag, time_record_pairs) if @remove_prefix and ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix) tag = tag[@removed_length..-1] end if tag.length > 0 tag = @added_prefix_string + tag if @added_prefix_string else tag = @add_prefix end time_record_pairs.each {|t,r| router.emit(tag, t, r) } end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_sampling_filter.rb, line 49 def process(tag, es) t = if @sample_unit == :all 'all' else tag end pairs = [] # Access to @counts SHOULD be protected by mutex, with a heavy penalty. # Code below is not thread safe, but @counts (counter for sampling rate) is not # so serious value (and probably will not be broken...), # then i let here as it is now. if @minimum_rate_per_min @resets[t] ||= Fluent::Clock.now + (60 - rand(30)) if Fluent::Clock.now > @resets[t] @resets[t] = Fluent::Clock.now + 60 @counts[t] = 0 end es.each do |time,record| c = (@counts[t] = @counts.fetch(t, 0) + 1) if c < @minimum_rate_per_min or c % @interval == 0 pairs.push [time, record] end end else es.each do |time,record| c = (@counts[t] = @counts.fetch(t, 0) + 1) if c % @interval == 0 pairs.push [time, record] # reset only just before @counts[t] is to be bignum from fixnum @counts[t] = 0 if c > 0x6fffffff end end end emit_sampled(tag, pairs) end