class Embulk::Schema

Attributes

names[R]
types[R]

Public Class Methods

from_java(java_schema) click to toggle source
# File lib/embulk/schema.rb, line 97
def self.from_java(java_schema)
  new java_schema.getColumns.map {|column| Column.from_java(column) }
end
new(columns) click to toggle source
Calls superclass method
# File lib/embulk/schema.rb, line 7
def initialize(columns)
  columns = columns.map.with_index {|c,index|
    if c.index && c.index != index
      # TODO ignore this error?
      raise "Index of column '#{c.name}' is #{c.index} but it is at column #{index}."
    end
    Column.new(index, c.name, c.type, c.format)
  }
  super(columns)

  record_reader_script =
    "lambda do |reader|\n" <<
    "record = []\n"
  each do |column|
    idx = column.index
    column_script =
      "if reader.isNull(#{idx})\n" <<
      "record << nil\n" <<
      "else\n" <<
      case column.type
      when :boolean
        "record << reader.getBoolean(#{idx})"
      when :long
        "record << reader.getLong(#{idx})"
      when :double
        "record << reader.getDouble(#{idx})"
      when :string
        "record << reader.getString(#{idx})"
      when :timestamp
        "record << (java_timestamp = reader.getTimestamp(#{idx}); ruby_time = Java::org.jruby.RubyTime.new(JRuby.runtime, JRuby.runtime.getClass('Time'), Java::org.joda.time.DateTime.new(java_timestamp.toEpochMilli())).gmtime().to_java(Java::org.jruby.RubyTime); ruby_time.setNSec(java_timestamp.getNano()); ruby_time)"
      when :json
        "record << MessagePack.unpack(String.from_java_bytes((::Java::org.msgpack.core.MessagePack.newDefaultBufferPacker()).packValue(reader.getJson(#{idx})).toMessageBuffer().toByteArray()))"
      else
        raise "Unknown type #{column.type.inspect}"
      end <<
      "end\n"
    record_reader_script << column_script << "\n"
  end
  record_reader_script << "record\n"
  record_reader_script << "end"
  @record_reader = eval(record_reader_script)

  record_writer_script = "lambda do |builder,record|\n"
  record_writer_script << "java_timestamp_class = ::Embulk::Java::Timestamp\n"
  each do |column|
    idx = column.index
    column_script =
      "if record[#{idx}].nil?\n" <<
      "builder.setNull(#{idx})\n" <<
      "else\n" <<
      case column.type
      when :boolean
        "builder.setBoolean(#{idx}, record[#{idx}])"
      when :long
        "builder.setLong(#{idx}, record[#{idx}])"
      when :double
        "builder.setDouble(#{idx}, record[#{idx}])"
      when :string
        "builder.setString(#{idx}, record[#{idx}])"
      when :timestamp
        # It was originally expecting that `record[#{idx}]` was a Ruby Time object. Does it really happen?
        "(ruby_time = record[#{idx}].to_java(Java::org.jruby.RubyTime); msec = ruby_time.getDateTime().getMillis(); builder.setTimestamp(#{idx}, java_timestamp_class.ofEpochSecond(msec / 1000, ruby_time.getNSec() + (msec % 1000) * 1000000)))"
      when :json
        "builder.setJson(#{idx}, ::Java::org.msgpack.core.MessagePack.newDefaultUnpacker(MessagePack.pack(record[#{idx}]).to_java_bytes).unpackValue())"
      else
        raise "Unknown type #{column.type.inspect}"
      end <<
      "end\n"
    record_writer_script << column_script << "\n"
  end
  record_writer_script << "builder.addRecord\n"
  record_writer_script << "end"
  @record_writer = eval(record_writer_script)

  @names = map {|c| c.name }
  @types = map {|c| c.type }

  freeze
end

Public Instance Methods

read_record(page_reader) click to toggle source
# File lib/embulk/schema.rb, line 89
def read_record(page_reader)
  @record_reader.call(page_reader)
end
to_java() click to toggle source
# File lib/embulk/schema.rb, line 101
def to_java
  columns = self.map {|column| column.to_java }
  Java::Schema.new(columns)
end
write_record(page_builder, record) click to toggle source
# File lib/embulk/schema.rb, line 93
def write_record(page_builder, record)
  @record_writer.call(page_builder, record)
end