class Mongo::BulkWrite

Constants

SINGLE_STATEMENT_OPS

Attributes

collection[R]

@return [ Mongo::Collection ] collection The collection.

options[R]

@return [ Hash, BSON::Document ] options The options.

requests[R]

@return [ Array<Hash, BSON::Document> ] requests The requests.

Public Class Methods

new(collection, requests, options = {}) click to toggle source

Create the new bulk write operation.

@api private

@example Create an ordered bulk write.

Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}])

@example Create an unordered bulk write.

Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}], ordered: false)

@example Create an ordered mixed bulk write.

Mongo::BulkWrite.new(
  collection,
  [
    { insert_one: { _id: 1 }},
    { update_one: { filter: { _id: 0 }, update: { '$set' => { name: 'test' }}}},
    { delete_one: { filter: { _id: 2 }}}
  ]
)

@param [ Mongo::Collection ] collection The collection. @param [ Array<Hash, BSON::Document> ] requests The requests. @param [ Hash, BSON::Document ] options The options.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 119
def initialize(collection, requests, options = {})
  @collection = collection
  @requests = requests
  @options = options || {}
end

Public Instance Methods

execute() click to toggle source

Execute the bulk write operation.

@example Execute the bulk write.

bulk_write.execute

@return [ Mongo::BulkWrite::Result ] The result.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 55
def execute
  operation_id = Monitoring.next_operation_id
  result_combiner = ResultCombiner.new
  operations = op_combiner.combine

  client.send(:with_session, @options) do |session|
    operations.each do |operation|
      if single_statement?(operation)
        write_concern = write_concern(session)
        write_with_retry(session, write_concern) do |server, txn_num|
          server.with_connection do |connection|
            execute_operation(
              operation.keys.first,
              operation.values.flatten,
              connection,
              operation_id,
              result_combiner,
              session,
              txn_num)
          end
        end
      else
        nro_write_with_retry(session, write_concern) do |server|
          server.with_connection do |connection|
            execute_operation(
              operation.keys.first,
              operation.values.flatten,
              connection,
              operation_id,
              result_combiner,
              session)
          end
        end
      end
    end
  end
  result_combiner.result
end
ordered?() click to toggle source

Is the bulk write ordered?

@api private

@example Is the bulk write ordered?

bulk_write.ordered?

@return [ true, false ] If the bulk write is ordered.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 135
def ordered?
  @ordered ||= options.fetch(:ordered, true)
end
write_concern(session = nil) click to toggle source

Get the write concern for the bulk write.

@api private

@example Get the write concern.

bulk_write.write_concern

@return [ WriteConcern ] The write concern.

@since 2.1.0

# File lib/mongo/bulk_write.rb, line 149
def write_concern(session = nil)
  @write_concern ||= options[:write_concern] ?
    WriteConcern.get(options[:write_concern]) :
    collection.write_concern_with_session(session)
end

Private Instance Methods

base_spec(operation_id, session) click to toggle source
# File lib/mongo/bulk_write.rb, line 165
def base_spec(operation_id, session)
  {
    :db_name => database.name,
    :coll_name => collection.name,
    :write_concern => write_concern(session),
    :ordered => ordered?,
    :operation_id => operation_id,
    :bypass_document_validation => !!options[:bypass_document_validation],
    :max_time_ms => options[:max_time_ms],
    :options => options,
    :id_generator => client.options[:id_generator],
    :session => session
  }
end
delete_many(documents, connection, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 230
def delete_many(documents, connection, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents)
  Operation::Delete.new(spec).bulk_execute(connection, client: client)
end
delete_one(documents, connection, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 223
def delete_one(documents, connection, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
  Operation::Delete.new(spec).bulk_execute(connection, client: client)
end
execute_operation(name, values, connection, operation_id, result_combiner, session, txn_num = nil) click to toggle source
# File lib/mongo/bulk_write.rb, line 180
def execute_operation(name, values, connection, operation_id, result_combiner, session, txn_num = nil)
  validate_collation!(connection)
  validate_array_filters!(connection)
  validate_hint!(connection)

  unpin_maybe(session) do
    if values.size > connection.description.max_write_batch_size
      split_execute(name, values, connection, operation_id, result_combiner, session, txn_num)
    else
      result = send(name, values, connection, operation_id, session, txn_num)

      add_server_diagnostics(connection) do
        add_error_labels(client, connection, session) do
          result_combiner.combine!(result, values.size)
        end
      end
    end
  end
# With OP_MSG (3.6+ servers), the size of each section in the message
# is independently capped at 16m and each bulk operation becomes
# its own section. The size of the entire bulk write is limited to 48m.
# With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a
# single document and is thus subject to the 16m document size limit.
# This means the splits differ between pre-3.6 and 3.6+ servers, with
# 3.6+ servers being able to split less.
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
  raise e if values.size <= 1
  unpin_maybe(session) do
    split_execute(name, values, connection, operation_id, result_combiner, session, txn_num)
  end
end
insert_one(documents, connection, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 237
def insert_one(documents, connection, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
  Operation::Insert.new(spec).bulk_execute(connection, client: client)
end
op_combiner() click to toggle source
# File lib/mongo/bulk_write.rb, line 212
def op_combiner
  @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end
replace_one(documents, connection, operation_id, session, txn_num)
Alias for: update_one
single_statement?(operation) click to toggle source
# File lib/mongo/bulk_write.rb, line 161
def single_statement?(operation)
  SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end
split_execute(name, values, connection, operation_id, result_combiner, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 216
def split_execute(name, values, connection, operation_id, result_combiner, session, txn_num)
  execute_operation(name, values.shift(values.size / 2), connection, operation_id, result_combiner, session, txn_num)

  txn_num = session.next_txn_num if txn_num
  execute_operation(name, values, connection, operation_id, result_combiner, session, txn_num)
end
update_many(documents, connection, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 252
def update_many(documents, connection, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents)
  Operation::Update.new(spec).bulk_execute(connection, client: client)
end
update_one(documents, connection, operation_id, session, txn_num) click to toggle source
# File lib/mongo/bulk_write.rb, line 244
def update_one(documents, connection, operation_id, session, txn_num)
  QueryCache.clear_namespace(collection.namespace)

  spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
  Operation::Update.new(spec).bulk_execute(connection, client: client)
end
Also aliased as: replace_one
validate_array_filters!(connection) click to toggle source
# File lib/mongo/bulk_write.rb, line 267
def validate_array_filters!(connection)
  if op_combiner.has_array_filters? && !connection.features.array_filters_enabled?
    raise Error::UnsupportedArrayFilters.new
  end
end
validate_collation!(connection) click to toggle source
# File lib/mongo/bulk_write.rb, line 261
def validate_collation!(connection)
  if op_combiner.has_collation? && !connection.features.collation_enabled?
    raise Error::UnsupportedCollation.new
  end
end
validate_hint!(connection) click to toggle source
# File lib/mongo/bulk_write.rb, line 273
def validate_hint!(connection)
  if op_combiner.has_hint?
    if write_concern && !write_concern.acknowledged?
      raise Error::UnsupportedOption.hint_error(unacknowledged_write: true)
    elsif !connection.features.update_delete_option_validation_enabled?
      raise Error::UnsupportedOption.hint_error
    end
  end
end