module ShadowsocksRuby::Connections::Connection
Mixed-in code to provide fiber enabled asynchronously receive function and pressure controled send_data
to EventMachine::Connection
User code should define process_hook
which hopefully implement a state machine .
Note: User code should not override post_init
and receive_data
, it is by design.
@example
class DummyConnection < EventMachine::Connection include ShadowsocksRuby::Connections::Connection def process_hook @i ||= 0 @i += 1 puts "I'm now in a fiber enabled context: #{@fiber}" Fiber.yield if @i >= 3 end end
Constants
- PressureLevel
512K, used to pause plexer when plexer.get_outbound_data_size > this value
Attributes
you can set logger in test code
It is where to relay peer's traffic to For a server connection, plexer is backend connection. For a backend connection, plexer is server connection. @return [Connection]
Public Instance Methods
Asynchronously receive n bytes from @buffer @param [Integer] n Bytes to receive, if n = -1 returns all data in @buffer @return [String] Returned n bytes data
# File lib/shadowsocks_ruby/connections/connection.rb, line 95 def async_recv n # wait n bytes if n == -1 && @buffer.size == 0 || @buffer.size < n @wait_length = n Fiber.yield end # read n bytes from buffer if n == -1 s, @buffer = @buffer, String.new('', encoding: Encoding::ASCII_8BIT) return s else return @buffer.slice!(0, n) end end
Asynchronously receive data until str (eg: ā\r\nr\nā) appears. @param [String] str Desired endding str @raise BufferOversizeError
raise if cannot find str in first 65536 bytes (64K bytes)of @buffer,
enough for a HTTP request head.
@return [String] Returned data, with str at end
# File lib/shadowsocks_ruby/connections/connection.rb, line 115 def async_recv_until str # wait for str pos = @buffer =~ Regexp.new(str) while pos == nil @wait_length = -1 Fiber.yield pos = @buffer =~ Regexp.new(str) raise BufferOversizeError, "oversized async_recv_until read" if @buffer.size > 65536 end # read until str from buffer return @buffer.slice!(0, pos + str.length) end
get the logger object, the defautl logger is App.instance.logger
# File lib/shadowsocks_ruby/connections/connection.rb, line 52 def logger @logger ||= App.instance.logger end
# File lib/shadowsocks_ruby/connections/connection.rb, line 84 def peer @peer ||= begin port, ip = Socket.unpack_sockaddr_in(get_peername) "#{ip}:#{port}" end end
Initialize a fiber context and enter the process loop normally, a child class should not override post_init
, it is by design @private
# File lib/shadowsocks_ruby/connections/connection.rb, line 73 def post_init @buffer = String.new('', encoding: Encoding::ASCII_8BIT) @fiber = Fiber.new do # poor man's state machine while true process end end @fiber.resume end
if peer receving data is too slow, pause plexer sending data to me ,prevent memery usage to be too high @private
# File lib/shadowsocks_ruby/connections/connection.rb, line 159 def pressure_control @plexer ||= nil if @plexer != nil if get_outbound_data_size >= PressureLevel @plexer.pause unless @plexer.paused? EventMachine.next_tick self.method(:pressure_control) else @plexer.resume if @plexer.paused? end end end
Call process_hook, which should be defined in user code @private
# File lib/shadowsocks_ruby/connections/connection.rb, line 66 def process process_hook end
Provide fiber enabled data receiving, should be always be called in a fiber context.
Normally, client class should not call receive_data
directlly, instead should call async_recv
or async_recv_until
@param [String] data @raise OutOfFiberConextError
@private
# File lib/shadowsocks_ruby/connections/connection.rb, line 139 def receive_data data if @fiber.alive? @buffer << data if @wait_length == -1 || @buffer.size >= @wait_length @fiber.resume end else raise OutOfFiberContextError, "should not go here" end rescue MyErrorModule => e logger.info {e.message} close_connection rescue Exception => e logger.info {e.class.to_s + " " + e.message + e.backtrace.join("\n")} close_connection end
Close plexer first if it exists
# File lib/shadowsocks_ruby/connections/connection.rb, line 172 def unbind @plexer ||= nil @plexer.close_connection_after_writing if @plexer != nil end