class Pwwka::MessageQueuer

Queue messages for sending in a batch Primarily used when multiple messages need to sent from within a transaction block

Example:

# instantiate a message_queuer object
message_queuer  = MessageQueuerService.new
ActiveRecord::Base.transaction do
  # do a thing, then queue message
  message_queuer.queue_message(payload: {this: 'that'}, routing_key: 'go.to.there')

  # do another thing, then queue a delayed message
  message_queuer.queue_message(payload: {the: 'other'}, routing_key: 'go.somewhere.else', delayed: true, delay_by: 3000)
end
# send the queued messages if we make it out of the transaction alive
message_queuer.send_messages_safely

Attributes

message_queue[R]

Public Class Methods

new() click to toggle source
# File lib/pwwka/message_queuer.rb, line 27
def initialize()
  @message_queue  = []
end

Public Instance Methods

clear_messages() click to toggle source
# File lib/pwwka/message_queuer.rb, line 57
def clear_messages
  @message_queue.clear
end
queue_message(payload: nil, routing_key: nil, delayed: false, delay_by: nil) click to toggle source
# File lib/pwwka/message_queuer.rb, line 31
def queue_message(payload: nil, routing_key: nil, delayed: false, delay_by: nil)
  raise 'Missing payload' if payload.nil?
  raise 'Missing routing_key' if routing_key.nil?
  message_queue.push({
        payload: payload,
    routing_key: routing_key,
        delayed: delayed,
       delay_by: delay_by
  })
end
send_messages!() click to toggle source
# File lib/pwwka/message_queuer.rb, line 50
def send_messages!
  message_queue.each do |message|
    send_message!(*message_arguments(message))
  end
  clear_messages
end
send_messages_safely() click to toggle source
# File lib/pwwka/message_queuer.rb, line 42
def send_messages_safely
  message_queue.each do |message|
    delay_hash  = {delayed: message[:delayed], delay_by: message[:delay_by]}.delete_if{|_,v|!v}
    send_message_safely(*message_arguments(message))
  end
  clear_messages
end

Private Instance Methods

message_arguments(message) click to toggle source
# File lib/pwwka/message_queuer.rb, line 62
def message_arguments(message)
  delay_hash  = {delayed: message[:delayed], delay_by: message[:delay_by]}.delete_if{|_,v|!v}
  [message[:payload], message[:routing_key], (delay_hash.any? ? delay_hash : nil)].compact
end