module TreasureData::API::Job

Public Instance Methods

hive_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) click to toggle source

@param [String] q @param [String] db @param [String] result_url @param [Fixnum] priority @param [Hash] opts @return [String] job_id

# File lib/td/client/api/job.rb, line 221
def hive_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
  query(q, :hive, db, result_url, priority, retry_limit, opts)
end
job_result(job_id) click to toggle source

@param [String] job_id @return [Array]

# File lib/td/client/api/job.rb, line 123
def job_result(job_id)
  result = []
  unpacker = MessagePack::Unpacker.new
  job_result_download(job_id) do |chunk|
    unpacker.feed_each(chunk) do |row|
      result << row
    end
  end
  return result
end
job_result_each(job_id, &block) click to toggle source

block is optional and must accept 1 argument

@param [String] job_id @param [Proc] block @return [nil]

# File lib/td/client/api/job.rb, line 162
def job_result_each(job_id, &block)
  upkr = MessagePack::Unpacker.new
  # default to decompressing the response since format is fixed to 'msgpack'
  job_result_download(job_id) do |chunk|
    upkr.feed_each(chunk, &block)
  end
  nil
end
job_result_each_with_compr_size(job_id) { |unpacked, total| ... } click to toggle source

block is optional and must accept 1 argument

@param [String] job_id @param [Proc] block @return [nil]

# File lib/td/client/api/job.rb, line 176
def job_result_each_with_compr_size(job_id)
  upkr = MessagePack::Unpacker.new
  # default to decompressing the response since format is fixed to 'msgpack'
  job_result_download(job_id) do |chunk, total|
    upkr.feed_each(chunk) {|unpacked|
      yield unpacked, total if block_given?
    }
  end
  nil
end
job_result_format(job_id, format, io=nil) { |total| ... } click to toggle source

block is optional and must accept 1 parameter

@param [String] job_id @param [String] format @param [IO] io @param [Proc] block @return [nil, String]

# File lib/td/client/api/job.rb, line 141
def job_result_format(job_id, format, io=nil)
  if io
    job_result_download(job_id, format) do |chunk, total|
      io.write chunk
      yield total if block_given?
    end
    nil
  else
    body = String.new
    job_result_download(job_id, format) do |chunk|
      body << chunk
    end
    body
  end
end
job_result_raw(job_id, format, io = nil) { |total| ... } click to toggle source

@param [String] job_id @param [String] format @return [String]

# File lib/td/client/api/job.rb, line 190
def job_result_raw(job_id, format, io = nil)
  body = io ? nil : String.new
  job_result_download(job_id, format, false) do |chunk, total|
    if io
      io.write(chunk)
      yield total if block_given?
    else
      body << chunk
    end
  end
  body
end
job_status(job_id) click to toggle source

@param [String] job_id @return [String] HTTP status

# File lib/td/client/api/job.rb, line 111
def job_status(job_id)
  code, body, res = get("/v3/job/status/#{e job_id}")
  if code != "200"
    raise_error("Get job status failed", res)
  end

  js = checked_json(body, %w[status])
  return js['status']
end
kill(job_id) click to toggle source

@param [String] job_id @return [String]

# File lib/td/client/api/job.rb, line 205
def kill(job_id)
  code, body, res = post("/v3/job/kill/#{e job_id}")
  if code != "200"
    raise_error("Kill job failed", res)
  end
  js = checked_json(body, %w[])
  former_status = js['former_status']
  return former_status
end
list_jobs(from=0, to=nil, status=nil, conditions=nil) click to toggle source

@param [Fixnum] from @param [Fixnum] to (to is inclusive) @param [String] status @param [Hash] conditions @return [Array]

# File lib/td/client/api/job.rb, line 13
def list_jobs(from=0, to=nil, status=nil, conditions=nil)
  params = {}
  params['from'] = from.to_s if from
  params['to'] = to.to_s if to
  params['status'] = status.to_s if status
  params.merge!(conditions) if conditions
  code, body, res = get("/v3/job/list", params)
  if code != "200"
    raise_error("List jobs failed", res)
  end
  js = checked_json(body, %w[jobs])
  result = []
  js['jobs'].each {|m|
    job_id = m['job_id']
    type = (m['type'] || '?').to_sym
    database = m['database']
    status = m['status']
    query = m['query']
    start_at = m['start_at']
    end_at = m['end_at']
    cpu_time = m['cpu_time']
    result_size = m['result_size'] # compressed result size in msgpack.gz format
    result_url = m['result']
    priority = m['priority']
    retry_limit = m['retry_limit']
    duration = m['duration']
    num_records = m['num_records']
    result << [job_id, type, status, query, start_at, end_at, cpu_time,
               result_size, result_url, priority, retry_limit, nil, database,
               duration, num_records]
  }
  return result
end
pig_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) click to toggle source

@param [String] q @param [String] db @param [String] result_url @param [Fixnum] priority @param [Hash] opts @return [String] job_id

# File lib/td/client/api/job.rb, line 231
def pig_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
  query(q, :pig, db, result_url, priority, retry_limit, opts)
end
query(q, type=:hive, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) click to toggle source

@param [String] q @param [Symbol] type @param [String] db @param [String] result_url @param [Fixnum] priority @param [Hash] opts @return [String] job_id

# File lib/td/client/api/job.rb, line 242
def query(q, type=:hive, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={})
  params = {'query' => q}.merge(opts)
  params['result'] = result_url if result_url
  params['priority'] = priority if priority
  params['retry_limit'] = retry_limit if retry_limit
  code, body, res = post("/v3/job/issue/#{type}/#{e db}", params)
  if code != "200"
    raise_error("Query failed", res)
  end
  js = checked_json(body, %w[job_id])
  return js['job_id'].to_s
end
show_job(job_id) click to toggle source

@param [String] job_id @return [Array]

# File lib/td/client/api/job.rb, line 49
def show_job(job_id)
  # use v3/job/status instead of v3/job/show to poll finish of a job
  code, body, res = get("/v3/job/show/#{e job_id}")
  if code != "200"
    raise_error("Show job failed", res)
  end
  js = checked_json(body, %w[status])
  # TODO debug
  type = (js['type'] || '?').to_sym  # TODO
  database = js['database']
  query = js['query']
  status = js['status']
  debug = js['debug']
  url = js['url']
  start_at = js['start_at']
  end_at = js['end_at']
  cpu_time = js['cpu_time']
  result_size = js['result_size'] # compressed result size in msgpack.gz format
  num_records = js['num_records']
  duration = js['duration']
  result = js['result'] # result target URL
  linked_result_export_job_id = js['linked_result_export_job_id']
  result_export_target_job_id = js['result_export_target_job_id']
  hive_result_schema = (js['hive_result_schema'] || '')
  if hive_result_schema.empty?
    hive_result_schema = nil
  else
    begin
      hive_result_schema = JSON.parse(hive_result_schema)
    rescue JSON::ParserError => e
      # this is a workaround for a Known Limitation in the Pig Engine which does not set a default, auto-generated
      #   column name for anonymous columns (such as the ones that are generated from UDF like COUNT or SUM).
      # The schema will contain 'nil' for the name of those columns and that breaks the JSON parser since it violates
      #   the JSON syntax standard.
      if type == :pig and hive_result_schema !~ /[\{\}]/
        begin
          # NOTE: this works because a JSON 2 dimensional array is the same as a Ruby one.
          #   Any change in the format for the hive_result_schema output may cause a syntax error, in which case
          #   this lame attempt at fixing the problem will fail and we will be raising the original JSON exception
          hive_result_schema = eval(hive_result_schema)
        rescue SyntaxError => ignored_e
          raise e
        end
        hive_result_schema.each_with_index {|col_schema, idx|
          if col_schema[0].nil?
            col_schema[0] = "_col#{idx}"
          end
        }
      else
        raise e
      end
    end
  end
  priority = js['priority']
  retry_limit = js['retry_limit']
  return [type, query, status, url, debug, start_at, end_at, cpu_time,
          result_size, result, hive_result_schema, priority, retry_limit, nil, database, duration, num_records,
          linked_result_export_job_id, result_export_target_job_id]
end

Private Instance Methods

create_inflalte_or_null_inflate(response) click to toggle source
# File lib/td/client/api/job.rb, line 386
def create_inflalte_or_null_inflate(response)
  if response.header['Content-Encoding'].empty?
    NullInflate.new
  else
    create_inflate(response)
  end
end
create_inflate(response) click to toggle source
# File lib/td/client/api/job.rb, line 394
def create_inflate(response)
  if response.header['Content-Encoding'].include?('gzip')
    Zlib::Inflate.new(Zlib::MAX_WBITS + 16)
  else
    Zlib::Inflate.new
  end
end
job_result_download(job_id, format='msgpack', autodecode=true) { |chunk, current_total_chunk_size| ... } click to toggle source
# File lib/td/client/api/job.rb, line 297
def job_result_download(job_id, format='msgpack', autodecode=true)
  client, header = new_client
  client.send_timeout = @send_timeout
  client.receive_timeout = @read_timeout
  header['Accept-Encoding'] = 'deflate, gzip'

  url = build_endpoint("/v3/job/result/#{e job_id}", @host)
  params = {'format' => format}

  unless ENV['TD_CLIENT_DEBUG'].nil?
    puts "DEBUG: REST GET call:"
    puts "DEBUG:   header: " + header.to_s
    puts "DEBUG:   url:    " + url.to_s
    puts "DEBUG:   params: " + params.to_s
  end

  # up to 7 retries with exponential (base 2) back-off starting at 'retry_delay'
  retry_delay = @retry_delay
  cumul_retry_delay = 0
  current_total_chunk_size = 0
  infl = nil
  begin # LOOP of Network/Server errors
    first_chunk_p = true
    response = client.get(url, params, header) do |res, chunk|
      # Validate only on first chunk
      validate_response_status(res, current_total_chunk_size) if first_chunk_p
      first_chunk_p = false

      if infl.nil? && autodecode
        case res.header['Content-Encoding'][0].to_s.downcase
        when 'gzip'
          infl = Zlib::Inflate.new(Zlib::MAX_WBITS + 16)
        when 'deflate'
          infl = Zlib::Inflate.new
        end
      end
      current_total_chunk_size += chunk.bytesize
      chunk = infl.inflate(chunk) if infl
      yield chunk, current_total_chunk_size
    end

    # for the case response body is empty
    # Note that webmock returns response.body as "" instead of nil
    validate_response_status(response, 0) if response.body.to_s.empty?

    # completed?
    validate_content_length_with_range(response, current_total_chunk_size)
  rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Timeout::Error, EOFError,
    SystemCallError, OpenSSL::SSL::SSLError, SocketError, HTTPClient::TimeoutError,
    HTTPServerException => e
    if current_total_chunk_size > 0
      if etag = response.header['ETag'][0]
        header['If-Range'] = etag
        header['Range'] = "bytes=#{current_total_chunk_size}-"
      end
    end

    $stderr.print "#{e.class}: #{e.message}. "
    if cumul_retry_delay < @max_cumul_retry_delay
      $stderr.puts "Retrying after #{retry_delay} seconds..."
      sleep retry_delay
      cumul_retry_delay += retry_delay
      retry_delay *= 2
      retry
    end
    raise
  end

  unless ENV['TD_CLIENT_DEBUG'].nil?
    puts "DEBUG: REST GET response:"
    puts "DEBUG:   header: " + response.header.to_s
    puts "DEBUG:   status: " + response.code.to_s
    puts "DEBUG:   body:   " + response.body.to_s
  end

  nil
ensure
  infl.close if infl
end
validate_content_length_with_range(response, current_total_chunk_size) click to toggle source
# File lib/td/client/api/job.rb, line 257
def validate_content_length_with_range(response, current_total_chunk_size)
  if expected_size = response.header['Content-Range'][0]
    expected_size = expected_size[/\d+$/].to_i
  elsif expected_size = response.header['Content-Length'][0]
    expected_size = expected_size.to_i
  end

  if expected_size.nil?
  elsif current_total_chunk_size < expected_size
    # too small
    # NOTE:
    #   ext/openssl raises EOFError in case where underlying connection
    #   causes an error, but httpclient ignores it.
    #   https://github.com/nahi/httpclient/blob/v3.2.8/lib/httpclient/session.rb#L1003
    raise EOFError, 'httpclient IncompleteError'
  elsif current_total_chunk_size > expected_size
    # too large
    raise_error("Get job result failed", response)
  end
end
validate_response_status(res, current_total_chunk_size=0) click to toggle source
# File lib/td/client/api/job.rb, line 278
def validate_response_status(res, current_total_chunk_size=0)
  case res.status
  when 200
    if current_total_chunk_size != 0
      # try to resume but the server returns 200
      raise_error("Get job result failed", res)
    end
  when 206 # resuming
  else
    if res.status/100 == 5
      raise HTTPServerException
    end
    raise_error("Get job result failed", res)
  end
end