class Fluent::Plugin::SamplingFilter
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_sampling.rb, line 14 def configure(conf) super @counts = {} @resets = {} if @minimum_rate_per_min @accessor = record_accessor_create(@sample_unit) unless %w(all tag).include?(@sample_unit) end
filter(tag, _time, record)
click to toggle source
Access to @counts SHOULD be protected by mutex, with a heavy penalty. Code below is not thread safe, but @counts (counter for sampling rate) is not so serious value (and probably will not be broken…), then i let here as it is now.
# File lib/fluent/plugin/filter_sampling.rb, line 27 def filter(tag, _time, record) t = record_key(tag, record) if @minimum_rate_per_min filter_with_minimum_rate(t, record) else filter_simple(t, record) end end
filter_simple(t, record)
click to toggle source
# File lib/fluent/plugin/filter_sampling.rb, line 36 def filter_simple(t, record) c = (@counts[t] = @counts.fetch(t, 0) + 1) # reset only just before @counts[t] is to be bignum from fixnum @counts[t] = 0 if c > 0x6fffffff if c % @interval == 0 record else nil end end
filter_with_minimum_rate(t, record)
click to toggle source
# File lib/fluent/plugin/filter_sampling.rb, line 47 def filter_with_minimum_rate(t, record) @resets[t] ||= Fluent::Clock.now + (60 - rand(30)) if Fluent::Clock.now > @resets[t] @resets[t] = Fluent::Clock.now + 60 @counts[t] = 0 end c = (@counts[t] = @counts.fetch(t, 0) + 1) if c < @minimum_rate_per_min || c % @interval == 0 record.dup else nil end end
record_key(tag, record)
click to toggle source
# File lib/fluent/plugin/filter_sampling.rb, line 61 def record_key(tag, record) case @sample_unit when 'all' 'all' when 'tag' tag else @accessor.call(record) end end