class Fluent::Plugin::PgHStoreOutput

Constants

DEFAULT_BUFFER_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 25
def initialize
  super
  @conn = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 30
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 50
def format(tag, time, record)
  [time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 54
def formatted_to_msgpack_binary
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 42
def shutdown
  super

  if @conn != nil and @conn.finished?() == false
    conn.close()
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 36
def start
  super

  create_table(@table) unless table_exists?(@table)
end
table_schema(tablename) click to toggle source

for tests.

# File lib/fluent/plugin/out_pghstore.rb, line 76
  def table_schema(tablename)
    sql =<<"SQL"
CREATE TABLE #{tablename} (
  tag TEXT[],
  time TIMESTAMP WITH TIME ZONE,
  record HSTORE
);
SQL
    sql
  end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 58
def write(chunk)
  conn = get_connection()
  return if conn == nil  # TODO: chunk will be dropped. should retry?

  tag = chunk.metadata.tag
  chunk.msgpack_each {|(time_str, record)|
    sql = generate_sql(conn, tag, time_str, record)
    begin
      conn.exec(sql)
    rescue PG::Error => e
      log.error "PG::Error: " + e.message  # dropped if error
    end
  }

  conn.close()
end

Private Instance Methods

create_table(tablename) click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 142
def create_table(tablename)
  sql = table_schema(tablename)

  sql += @table_option if @table_option

  conn = get_connection()
  raise "Could not connect the database at create_table. abort." if conn == nil

  begin
    conn.exec(sql)
  rescue PG::Error => e
    log.error "Error at create_table:" + e.message
    log.error "SQL:" + sql
  end
  conn.close

  log.warn "table #{tablename} was not exist. created it."
end
generate_sql(conn, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 89
  def generate_sql(conn, tag, time, record)
    kv_list = []
    record.each {|(key,value)|
      kv_list.push("\"#{conn.escape_string(key.to_s)}\" => \"#{conn.escape_string(value.to_s)}\"")
    }

    tag_list = tag.split(".")
    tag_list.map! {|t| "'" + t + "'"}

    sql =<<"SQL"
INSERT INTO #{@table} (tag, time, record) VALUES
(ARRAY[#{tag_list.join(",")}], '#{Time.at(time)}'::TIMESTAMP WITH TIME ZONE, E'#{kv_list.join(",")}');
SQL

    return sql
  end
get_connection() click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 106
def get_connection()
  if @conn != nil and @conn.finished?() == false
      return @conn  # connection is alived
  end

  begin
    if @user
      @conn = PG.connect(:dbname => @database, :host => @host, :port => @port,
                         :user => @user, :password => @password)
    else
      @conn = PG.connect(:dbname => @database, :host => @host, :port => @port)
    end
  rescue PG::Error => e
    log.error "Error: could not connect database:" + @database
    return nil
  end

  return @conn

end
table_exists?(table) click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 127
  def table_exists?(table)
    sql =<<"SQL"
SELECT COUNT(*) FROM pg_tables WHERE tablename = '#{table}';
SQL
    conn = get_connection()
    raise "Could not connect the database at startup. abort." if conn == nil
    res = conn.exec(sql)
    conn.close
    if res[0]["count"] == "1"
      return true
    else
      return false
    end
  end