class Fluent::Plugin::PgHStoreOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 25 def initialize super @conn = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 30 def configure(conf) compat_parameters_convert(conf, :buffer) super raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 50 def format(tag, time, record) [time, record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 54 def formatted_to_msgpack_binary true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 42 def shutdown super if @conn != nil and @conn.finished?() == false conn.close() end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pghstore.rb, line 36 def start super create_table(@table) unless table_exists?(@table) end
table_schema(tablename)
click to toggle source
for tests.
# File lib/fluent/plugin/out_pghstore.rb, line 76 def table_schema(tablename) sql =<<"SQL" CREATE TABLE #{tablename} ( tag TEXT[], time TIMESTAMP WITH TIME ZONE, record HSTORE ); SQL sql end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 58 def write(chunk) conn = get_connection() return if conn == nil # TODO: chunk will be dropped. should retry? tag = chunk.metadata.tag chunk.msgpack_each {|(time_str, record)| sql = generate_sql(conn, tag, time_str, record) begin conn.exec(sql) rescue PG::Error => e log.error "PG::Error: " + e.message # dropped if error end } conn.close() end
Private Instance Methods
create_table(tablename)
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 142 def create_table(tablename) sql = table_schema(tablename) sql += @table_option if @table_option conn = get_connection() raise "Could not connect the database at create_table. abort." if conn == nil begin conn.exec(sql) rescue PG::Error => e log.error "Error at create_table:" + e.message log.error "SQL:" + sql end conn.close log.warn "table #{tablename} was not exist. created it." end
generate_sql(conn, tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 89 def generate_sql(conn, tag, time, record) kv_list = [] record.each {|(key,value)| kv_list.push("\"#{conn.escape_string(key.to_s)}\" => \"#{conn.escape_string(value.to_s)}\"") } tag_list = tag.split(".") tag_list.map! {|t| "'" + t + "'"} sql =<<"SQL" INSERT INTO #{@table} (tag, time, record) VALUES (ARRAY[#{tag_list.join(",")}], '#{Time.at(time)}'::TIMESTAMP WITH TIME ZONE, E'#{kv_list.join(",")}'); SQL return sql end
get_connection()
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 106 def get_connection() if @conn != nil and @conn.finished?() == false return @conn # connection is alived end begin if @user @conn = PG.connect(:dbname => @database, :host => @host, :port => @port, :user => @user, :password => @password) else @conn = PG.connect(:dbname => @database, :host => @host, :port => @port) end rescue PG::Error => e log.error "Error: could not connect database:" + @database return nil end return @conn end
table_exists?(table)
click to toggle source
# File lib/fluent/plugin/out_pghstore.rb, line 127 def table_exists?(table) sql =<<"SQL" SELECT COUNT(*) FROM pg_tables WHERE tablename = '#{table}'; SQL conn = get_connection() raise "Could not connect the database at startup. abort." if conn == nil res = conn.exec(sql) conn.close if res[0]["count"] == "1" return true else return false end end