class Fluent::Plugin::LokiOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 71 def configure(conf) compat_parameters_convert(conf, :buffer) super @label_keys = @label_keys.split(/\s*,\s*/) if @label_keys @remove_keys = @remove_keys.split(',').map(&:strip) if @remove_keys @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert @key = OpenSSL::PKey::RSA.new(File.read(key)) if @key if !@ca_cert.nil? && !File.exist?(@ca_cert) raise "CA certificate file #{@ca_cert} not found" end end
generic_to_loki(chunk)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 145 def generic_to_loki(chunk) # log.debug("GenericToLoki: converting #{chunk}") streams = chunk_to_loki(chunk) payload = payload_builder(streams) payload end
http_opts(uri)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 90 def http_opts(uri) opts = { use_ssl: uri.scheme == 'https' } opts end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 86 def multi_workers_ready? true end
write(chunk)
click to toggle source
flush a chunk to loki
# File lib/fluent/plugin/out_loki.rb, line 98 def write(chunk) # streams by label payload = generic_to_loki(chunk) body = { 'streams': payload } # add ingest path to loki url uri = URI.parse(url + '/api/prom/push') req = Net::HTTP::Post.new( uri.request_uri ) req.add_field('Content-Type', 'application/json') req.add_field('X-Scope-OrgID', @tenant) if @tenant req.body = Yajl.dump(body) req.basic_auth(@username, @password) if @username opts = { use_ssl: uri.scheme == 'https' } if !@cert.nil? && !@key.nil? opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_PEER, cert: @cert, key: @key ) end if !@ca_cert.nil? opts = opts.merge( ca_file: @ca_cert ) end log.debug "sending #{req.body.length} bytes to loki" res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) } unless res && res.is_a?(Net::HTTPSuccess) res_summary = if res "#{res.code} #{res.message} #{res.body}" else 'res=nil' end log.warn "failed to #{req.method} #{uri} (#{res_summary})" log.warn Yajl.dump(body) end end
Private Instance Methods
chunk_to_loki(chunk)
click to toggle source
iterate through each chunk and create a loki stream entry
# File lib/fluent/plugin/out_loki.rb, line 245 def chunk_to_loki(chunk) streams = {} last_time = nil chunk.each do |time, record| # each chunk has a unique set of labels last_time = time if last_time.nil? result = line_to_loki(record) chunk_labels = result[:labels] # initialize a new stream with the chunk_labels if it does not exist streams[chunk_labels] = [] if streams[chunk_labels].nil? # NOTE: timestamp must include nanoseconds # append to matching chunk_labels key streams[chunk_labels].push( 'ts' => Time.at(time.to_f).gmtime.iso8601(6), 'line' => result[:line] ) end streams end
labels_to_protocol(data_labels)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 160 def labels_to_protocol(data_labels) formatted_labels = [] # merge extra_labels with data_labels. If there are collisions extra_labels win. data_labels = {} if data_labels.nil? data_labels = data_labels.merge(@extra_labels) data_labels.each do |k, v| formatted_labels.push("#{k}=\"#{v.gsub('"','\\"')}\"") if v end '{' + formatted_labels.join(',') + '}' end
line_to_loki(record)
click to toggle source
convert a line to loki line with labels
# File lib/fluent/plugin/out_loki.rb, line 208 def line_to_loki(record) chunk_labels = {} line = '' if record.is_a?(Hash) # remove needless keys. @remove_keys.each { |v| record.delete(v) } if @remove_keys # extract white listed record keys into labels. @label_keys.each do |k| next unless record.key?(k) record_value = record[k] if record_value.is_a?(Hash) record_value.each do |hk, hv| chunk_labels[hk] = hv end else chunk_labels[k] = record_value end record.delete(k) end if @label_keys line = record_to_line(record) else line = record.to_s end # return both the line content plus the labels found in the record { line: line, labels: chunk_labels } end
numeric?(val)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 154 def numeric?(val) !Float(val).nil? rescue StandardError false end
payload_builder(streams)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 173 def payload_builder(streams) payload = [] streams.each do |k, v| # create a stream for each label set. # Additionally sort the entries by timestamp just incase we # got them out of order. # 'labels' => '{worker="0"}', payload.push( 'labels' => labels_to_protocol(k), 'entries' => v.sort_by { |hsh| Time.parse(hsh['ts']) } ) end payload end
record_to_line(record)
click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 188 def record_to_line(record) line = '' if @drop_single_key && record.keys.length == 1 line = record[record.keys[0]] else case @line_format when :json line = Yajl.dump(record) when :key_value formatted_labels = [] record.each do |k, v| formatted_labels.push("#{k}=\"#{v}\"") end line = formatted_labels.join(' ') end end line end