class Fluent::Plugin::LokiOutput

Subclass of Fluent Plugin Output

Constants

DEFAULT_BUFFER_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_loki.rb, line 71
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super

  @label_keys = @label_keys.split(/\s*,\s*/) if @label_keys
  @remove_keys = @remove_keys.split(',').map(&:strip) if @remove_keys

  @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
  @key = OpenSSL::PKey::RSA.new(File.read(key)) if @key

  if !@ca_cert.nil? && !File.exist?(@ca_cert)
    raise "CA certificate file #{@ca_cert} not found"
  end
end
generic_to_loki(chunk) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 145
def generic_to_loki(chunk)
  # log.debug("GenericToLoki: converting #{chunk}")
  streams = chunk_to_loki(chunk)
  payload = payload_builder(streams)
  payload
end
http_opts(uri) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 90
def http_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }
  opts
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 86
def multi_workers_ready?
  true
end
write(chunk) click to toggle source

flush a chunk to loki

# File lib/fluent/plugin/out_loki.rb, line 98
def write(chunk)
  # streams by label
  payload = generic_to_loki(chunk)
  body = { 'streams': payload }

  # add ingest path to loki url
  uri = URI.parse(url + '/api/prom/push')

  req = Net::HTTP::Post.new(
    uri.request_uri
  )
  req.add_field('Content-Type', 'application/json')
  req.add_field('X-Scope-OrgID', @tenant) if @tenant
  req.body = Yajl.dump(body)
  req.basic_auth(@username, @password) if @username
  opts = {
    use_ssl: uri.scheme == 'https'
  }

  if !@cert.nil? && !@key.nil?
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_PEER,
      cert: @cert,
      key: @key
    )
  end

  if !@ca_cert.nil?
    opts = opts.merge(
      ca_file: @ca_cert
    )
  end

  log.debug "sending #{req.body.length} bytes to loki"
  res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
  unless res && res.is_a?(Net::HTTPSuccess)
    res_summary = if res
                    "#{res.code} #{res.message} #{res.body}"
                  else
                    'res=nil'
                  end
    log.warn "failed to #{req.method} #{uri} (#{res_summary})"
    log.warn Yajl.dump(body)

  end
end

Private Instance Methods

chunk_to_loki(chunk) click to toggle source

iterate through each chunk and create a loki stream entry

# File lib/fluent/plugin/out_loki.rb, line 245
def chunk_to_loki(chunk)
  streams = {}
  last_time = nil
  chunk.each do |time, record|
    # each chunk has a unique set of labels
    last_time = time if last_time.nil?
    result = line_to_loki(record)
    chunk_labels = result[:labels]
    # initialize a new stream with the chunk_labels if it does not exist
    streams[chunk_labels] = [] if streams[chunk_labels].nil?
    # NOTE: timestamp must include nanoseconds
    # append to matching chunk_labels key
    streams[chunk_labels].push(
      'ts' => Time.at(time.to_f).gmtime.iso8601(6),
      'line' => result[:line]
    )
  end
  streams
end
labels_to_protocol(data_labels) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 160
def labels_to_protocol(data_labels)
  formatted_labels = []

  # merge extra_labels with data_labels. If there are collisions extra_labels win.
  data_labels = {} if data_labels.nil?
  data_labels = data_labels.merge(@extra_labels)

  data_labels.each do |k, v|
    formatted_labels.push("#{k}=\"#{v.gsub('"','\\"')}\"") if v
  end
  '{' + formatted_labels.join(',') + '}'
end
line_to_loki(record) click to toggle source

convert a line to loki line with labels

# File lib/fluent/plugin/out_loki.rb, line 208
def line_to_loki(record)
  chunk_labels = {}
  line = ''
  if record.is_a?(Hash)
    # remove needless keys.
    @remove_keys.each { |v|
      record.delete(v)
    } if @remove_keys
    # extract white listed record keys into labels.
    @label_keys.each do |k|
      next unless record.key?(k)

      record_value = record[k]

      if record_value.is_a?(Hash)
        record_value.each do |hk, hv|
          chunk_labels[hk] = hv
        end
      else
        chunk_labels[k] = record_value
      end

      record.delete(k)
    end if @label_keys
    line = record_to_line(record)
  else
    line = record.to_s
  end

  # return both the line content plus the labels found in the record
  {
    line: line,
    labels: chunk_labels
  }
end
numeric?(val) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 154
def numeric?(val)
  !Float(val).nil?
rescue StandardError
  false
end
payload_builder(streams) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 173
def payload_builder(streams)
  payload = []
  streams.each do |k, v|
    # create a stream for each label set.
    # Additionally sort the entries by timestamp just incase we
    # got them out of order.
    # 'labels' => '{worker="0"}',
    payload.push(
      'labels' => labels_to_protocol(k),
      'entries' => v.sort_by { |hsh| Time.parse(hsh['ts']) }
    )
  end
  payload
end
record_to_line(record) click to toggle source
# File lib/fluent/plugin/out_loki.rb, line 188
def record_to_line(record)
  line = ''
  if @drop_single_key && record.keys.length == 1
    line = record[record.keys[0]]
  else
    case @line_format
    when :json
      line = Yajl.dump(record)
    when :key_value
      formatted_labels = []
      record.each do |k, v|
        formatted_labels.push("#{k}=\"#{v}\"")
      end
      line = formatted_labels.join(' ')
    end
  end
  line
end