class SemanticLogger::Appender::Async
Allow any appender to run asynchronously in a separate thread.
Attributes
appender[R]
lag_check_interval[RW]
lag_threshold_s[RW]
max_queue_size[R]
queue[R]
Public Class Methods
new(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30)
click to toggle source
Appender
proxy to allow an existing appender to run asynchronously in a separate thread.
Parameters:
max_queue_size: [Integer] The maximum number of log messages to hold on the queue before blocking attempts to add to the queue. -1: The queue size is uncapped and will never block no matter how long the queue is. Default: 10,000 lag_threshold_s [Float] Log a warning when a log message has been on the queue for longer than this period in seconds. Default: 30 lag_check_interval: [Integer] Number of messages to process before checking for slow logging. Default: 1,000
# File lib/semantic_logger/appender/async.rb, line 39 def initialize(appender:, max_queue_size: 10_000, lag_check_interval: 1_000, lag_threshold_s: 30) @appender = appender @lag_check_interval = lag_check_interval @lag_threshold_s = lag_threshold_s @thread = nil @max_queue_size = max_queue_size create_queue thread end
Public Instance Methods
active?()
click to toggle source
Returns true if the worker thread is active
# File lib/semantic_logger/appender/async.rb, line 81 def active? @thread&.alive? end
capped?()
click to toggle source
Returns [true|false] if the queue has a capped size.
# File lib/semantic_logger/appender/async.rb, line 67 def capped? @capped end
close()
click to toggle source
Close all appenders and flush any outstanding messages.
# File lib/semantic_logger/appender/async.rb, line 97 def close # TODO: Prevent new close requests once this appender has been closed. submit_request(:close) end
flush()
click to toggle source
Flush all queued log entries disk, database, etc.
All queued log messages are written and then each appender is flushed in turn.
# File lib/semantic_logger/appender/async.rb, line 92 def flush submit_request(:flush) end
log(log)
click to toggle source
Add log message for processing.
# File lib/semantic_logger/appender/async.rb, line 86 def log(log) queue << log end
reopen()
click to toggle source
Re-open appender after a fork
# File lib/semantic_logger/appender/async.rb, line 54 def reopen # Workaround CRuby crash on fork by recreating queue on reopen # https://github.com/reidmorrison/semantic_logger/issues/103 @queue&.close create_queue appender.reopen if appender.respond_to?(:reopen) @thread.kill if @thread&.alive? @thread = Thread.new { process } end
thread()
click to toggle source
Returns [Thread] the worker thread.
Starts the worker thread if not running.
# File lib/semantic_logger/appender/async.rb, line 74 def thread return @thread if @thread&.alive? @thread = Thread.new { process } end
Private Instance Methods
check_lag(log)
click to toggle source
# File lib/semantic_logger/appender/async.rb, line 182 def check_lag(log) diff = Time.now - log.time return unless diff > lag_threshold_s logger.warn "Async: Appender thread has fallen behind by #{diff} seconds with #{queue.size} messages queued up. Consider reducing the log level or changing the appenders" end
create_queue()
click to toggle source
# File lib/semantic_logger/appender/async.rb, line 104 def create_queue if max_queue_size == -1 @queue = Queue.new @capped = false else @queue = SizedQueue.new(max_queue_size) @capped = true end end
process()
click to toggle source
Separate thread for batching up log messages before writing.
# File lib/semantic_logger/appender/async.rb, line 115 def process # This thread is designed to never go down unless the main thread terminates # or the appender is closed. Thread.current.name = logger.name logger.trace "Async: Appender thread active" begin process_messages rescue StandardError => e # This block may be called after the file handles have been released by Ruby begin logger.error("Async: Restarting due to exception", e) rescue StandardError nil end retry rescue Exception => e # This block may be called after the file handles have been released by Ruby begin logger.error("Async: Stopping due to fatal exception", e) rescue StandardError nil end ensure @thread = nil # This block may be called after the file handles have been released by Ruby begin logger.trace("Async: Thread has stopped") rescue StandardError nil end end end
process_message(message)
click to toggle source
Returns false when message processing should be stopped
# File lib/semantic_logger/appender/async.rb, line 167 def process_message(message) case message[:command] when :flush appender.flush message[:reply_queue] << true if message[:reply_queue] when :close appender.close message[:reply_queue] << true if message[:reply_queue] return false else logger.warn "Async: Appender thread: Ignoring unknown command: #{message[:command]}" end true end
process_messages()
click to toggle source
# File lib/semantic_logger/appender/async.rb, line 148 def process_messages count = 0 while (message = queue.pop) if message.is_a?(Log) appender.log(message) count += 1 # Check every few log messages whether this appender thread is falling behind if count > lag_check_interval check_lag(message) count = 0 end else break unless process_message(message) end end logger.trace "Async: Queue Closed" end
submit_request(command)
click to toggle source
Submit command and wait for reply
# File lib/semantic_logger/appender/async.rb, line 190 def submit_request(command) return false unless active? queue_size = queue.size msg = "Async: Queued log messages: #{queue_size}, running command: #{command}" if queue_size > 1_000 logger.warn msg elsif queue_size > 100 logger.info msg elsif queue_size.positive? logger.trace msg end reply_queue = Queue.new queue << {command: command, reply_queue: reply_queue} reply_queue.pop end