libcaf
0.17.1
|
A cooperatively scheduled, event-based actor implementation. More...
#include <scheduled_actor.hpp>
Classes | |
struct | mailbox_policy |
Configures the FIFO inbox with four nested queues: More... | |
struct | mailbox_visitor |
Consumes messages from the mailbox. More... | |
Public Types | |
enum | message_category { message_category::ordinary, message_category::internal, message_category::skipped } |
Categorizes incoming messages. More... | |
enum | activation_result { activation_result::success, activation_result::terminated, activation_result::skipped, activation_result::dropped } |
Result of one-shot activations. More... | |
using | super = local_actor |
Base type. | |
using | stream_manager_map = std::map< stream_slot, stream_manager_ptr > |
Maps slot IDs to stream managers. | |
using | normal_queue = intrusive::drr_cached_queue< policy::normal_messages > |
Stores asynchronous messages with default priority. | |
using | urgent_queue = intrusive::drr_cached_queue< policy::urgent_messages > |
Stores asynchronous messages with hifh priority. | |
using | upstream_queue = intrusive::drr_queue< policy::upstream_messages > |
Stores upstream messages. | |
using | downstream_queue = intrusive::wdrr_dynamic_multiplexed_queue< policy::downstream_messages > |
Stores downstream messages. | |
using | mailbox_type = intrusive::fifo_inbox< mailbox_policy > |
A queue optimized for single-reader-many-writers. | |
using | pending_response = std::pair< const message_id, behavior > |
The message ID of an outstanding response with its callback. | |
using | pointer = scheduled_actor * |
A pointer to a scheduled actor. | |
using | default_handler = std::function< result< message >(pointer, message_view &)> |
Function object for handling unmatched messages. | |
using | error_handler = std::function< void(pointer, error &)> |
Function object for handling error messages. | |
using | down_handler = std::function< void(pointer, down_msg &)> |
Function object for handling down messages. | |
using | exit_handler = std::function< void(pointer, exit_msg &)> |
Function object for handling exit messages. | |
using | exception_handler = std::function< error(pointer, std::exception_ptr &)> |
Function object for handling exit messages. | |
![]() | |
enum | resume_result { resume_later, awaiting_message, done, shutdown_execution_unit } |
Denotes the state in which a resumable returned from its last call to resume . More... | |
enum | subtype_t { unspecified, scheduled_actor, io_actor, function_object } |
Denotes common subtypes of resumable . More... | |
Public Member Functions | |
scheduled_actor (actor_config &cfg) | |
void | enqueue (mailbox_element_ptr ptr, execution_unit *eu) override |
mailbox_element * | peek_at_next_mailbox_element () override |
const char * | name () const override |
void | launch (execution_unit *eu, bool lazy, bool hide) override |
bool | cleanup (error &&fail_state, execution_unit *host) override |
subtype_t | subtype () const override |
Returns a subtype hint for this object. More... | |
void | intrusive_ptr_add_ref_impl () override |
Add a strong reference count to this object. | |
void | intrusive_ptr_release_impl () override |
Remove a strong reference count from this object. | |
resume_result | resume (execution_unit *, size_t) override |
Resume any pending computation until it is either finished or needs to be re-scheduled later. More... | |
virtual proxy_registry * | proxy_registry_ptr () |
Returns a factory for proxies created and managed by this actor or nullptr . More... | |
void | quit (error x=error{}) |
Finishes execution of this actor after any currently running message handler is done. More... | |
mailbox_type & | mailbox () noexcept |
Returns the queue for storing incoming messages. | |
stream_manager_map & | stream_managers () noexcept |
Returns map for all active streams. | |
stream_manager_map & | pending_stream_managers () noexcept |
Returns map for all pending streams. | |
void | set_default_handler (default_handler fun) |
Sets a custom handler for unexpected messages. | |
template<class F > | |
std::enable_if < std::is_convertible< F, std::function< result< message > type_erased_tuple &)> >::value > ::type | set_default_handler (F fun) |
Sets a custom handler for unexpected messages. | |
void | set_error_handler (error_handler fun) |
Sets a custom handler for error messages. | |
template<class T > | |
auto | set_error_handler (T fun) -> decltype(fun(std::declval< error & >())) |
Sets a custom handler for error messages. | |
void | set_down_handler (down_handler fun) |
Sets a custom handler for down messages. | |
template<class T > | |
auto | set_down_handler (T fun) -> decltype(fun(std::declval< down_msg & >())) |
Sets a custom handler for down messages. | |
void | set_exit_handler (exit_handler fun) |
Sets a custom handler for error messages. | |
template<class T > | |
auto | set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg & >())) |
Sets a custom handler for exit messages. | |
void | set_exception_handler (exception_handler fun) |
Sets a custom exception handler for this actor. More... | |
template<class F > | |
std::enable_if < std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type | set_exception_handler (F f) |
Sets a custom exception handler for this actor. More... | |
template<class Driver , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t> | |
make_source_result_t< typename Driver::downstream_manager_type, Ts...> | make_source (std::tuple< Ts...> xs, Init init, Pull pull, Done done, Finalize fin={}) |
Creates a new stream source by instantiating the default source implementation with Driver . More... | |
template<class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>> | |
make_source_result_t < DownstreamManager, Ts...> | make_source (std::tuple< Ts...> xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={}) |
Creates a new stream source from given arguments. More... | |
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>> | |
detail::enable_if_t <!is_actor_handle< Init > ::value &&Trait::valid, make_source_result_t < DownstreamManager > > | make_source (Init init, Pull pull, Done done, Finalize finalize={}, policy::arg< DownstreamManager > token={}) |
template<class ActorHandle , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>> | |
detail::enable_if_t < is_actor_handle< ActorHandle > ::value, make_source_result_t < DownstreamManager > > | make_source (const ActorHandle &dest, std::tuple< Ts...> xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={}) |
Creates a new stream source and adds dest as first outbound path to it. | |
template<class ActorHandle , class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>> | |
detail::enable_if_t < is_actor_handle< ActorHandle > ::value &&Trait::valid, make_source_result_t < DownstreamManager > > | make_source (const ActorHandle &dest, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager > token={}) |
Creates a new stream source and adds dest as first outbound path to it. | |
template<class Driver , class Init , class Pull , class Done , class Finalize = unit_t> | |
Driver::source_ptr_type | make_continuous_source (Init init, Pull pull, Done done, Finalize fin={}) |
Creates a new continuous stream source by instantiating the default source implementation with `Driver. More... | |
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>> | |
stream_source_ptr < DownstreamManager > | make_continuous_source (Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={}) |
Creates a new continuous stream source by instantiating the default source implementation with `Driver. More... | |
template<class Driver , class... Ts> | |
make_sink_result< typename Driver::input_type > | make_sink (const stream< typename Driver::input_type > &src, Ts &&...xs) |
template<class In , class Init , class Fun , class Finalize = unit_t, class Trait = stream_sink_trait_t<Fun>> | |
make_sink_result< In > | make_sink (const stream< In > &in, Init init, Fun fun, Finalize fin={}) |
template<class Driver , class In , class... Ts, class... Us> | |
make_stage_result_t< In, typename Driver::downstream_manager_type, Ts...> | make_stage (const stream< In > &src, std::tuple< Ts...> xs, Us &&...ys) |
template<class In , class... Ts, class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>> | |
make_stage_result_t< In, DownstreamManager, Ts...> | make_stage (const stream< In > &in, std::tuple< Ts...> xs, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={}) |
template<class In , class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>> | |
make_stage_result_t< In, DownstreamManager > | make_stage (const stream< In > &in, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={}) |
template<class Driver , class... Ts> | |
Driver::stage_ptr_type | make_continuous_stage (Ts &&...xs) |
Returns a stream manager (implementing a continuous stage) without in- or outbound path. More... | |
template<class Init , class Fun , class Cleanup , class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>> | |
stream_stage_ptr< typename Trait::input, DownstreamManager > | make_continuous_stage (Init init, Fun fun, Cleanup cleanup, policy::arg< DownstreamManager > token={}) |
Static Public Member Functions | |
static void | default_error_handler (pointer ptr, error &x) |
static void | default_down_handler (pointer ptr, down_msg &x) |
static void | default_exit_handler (pointer ptr, exit_msg &x) |
static error | default_exception_handler (pointer ptr, std::exception_ptr &x) |
Protected Attributes | |
mailbox_type | mailbox_ |
Stores incoming messages. | |
detail::behavior_stack | bhvr_stack_ |
Stores user-defined callbacks for message handling. | |
uint64_t | timeout_id_ |
Identifies the timeout messages we are currently waiting for. | |
std::forward_list < pending_response > | awaited_responses_ |
Stores callbacks for awaited responses. | |
detail::unordered_flat_map < message_id, behavior > | multiplexed_responses_ |
Stores callbacks for multiplexed responses. | |
default_handler | default_handler_ |
Customization point for setting a default message callback. | |
error_handler | error_handler_ |
Customization point for setting a default error callback. | |
down_handler | down_handler_ |
Customization point for setting a default down_msg callback. | |
exit_handler | exit_handler_ |
Customization point for setting a default exit_msg callback. | |
stream_manager_map | stream_managers_ |
Stores stream managers for established streams. | |
stream_manager_map | pending_stream_managers_ |
Stores stream managers for pending streams, i.e., streams that have not yet received an ACK. More... | |
detail::tick_emitter | stream_ticks_ |
Controls batch and credit timeouts. | |
size_t | max_batch_delay_ticks_ |
Number of ticks per batch delay. | |
size_t | credit_round_ticks_ |
Number of ticks of each credit round. | |
detail::private_thread * | private_thread_ |
Pointer to a private thread object associated with a detached actor. | |
exception_handler | exception_handler_ |
Customization point for setting a default exception callback. | |
Related Functions | |
(Note that these are not member functions.) | |
result< message > | reflect (scheduled_actor *, message_view &) |
result< message > | reflect_and_quit (scheduled_actor *, message_view &) |
result< message > | print_and_drop (scheduled_actor *, message_view &) |
result< message > | drop (scheduled_actor *, message_view &) |
A cooperatively scheduled, event-based actor implementation.
|
strong |
|
strong |
Driver::source_ptr_type caf::scheduled_actor::make_continuous_source | ( | Init | init, |
Pull | pull, | ||
Done | done, | ||
Finalize | fin = {} |
||
) |
Creates a new continuous stream source by instantiating the default source implementation with `Driver.
`The returned manager is not connected to any slot and thus not stored by the actor automatically.
stream_source_ptr<DownstreamManager> caf::scheduled_actor::make_continuous_source | ( | Init | init, |
Pull | pull, | ||
Done | done, | ||
Finalize | fin = {} , |
||
policy::arg< DownstreamManager > | = {} |
||
) |
Creates a new continuous stream source by instantiating the default source implementation with `Driver.
`The returned manager is not connected to any slot and thus not stored by the actor automatically.
Driver::stage_ptr_type caf::scheduled_actor::make_continuous_stage | ( | Ts &&... | xs | ) |
Returns a stream manager (implementing a continuous stage) without in- or outbound path.
The returned manager is not connected to any slot and thus not stored by the actor automatically.
make_source_result_t<typename Driver::downstream_manager_type, Ts...> caf::scheduled_actor::make_source | ( | std::tuple< Ts...> | xs, |
Init | init, | ||
Pull | pull, | ||
Done | done, | ||
Finalize | fin = {} |
||
) |
Creates a new stream source by instantiating the default source implementation with Driver
.
xs | User-defined handshake payload. |
init | Function object for initializing the state of the source. |
pull | Generator function object for producing downstream messages. |
done | Predicate returning true when generator is done. |
fin | Cleanup handler. |
stream_manager
and the output slot. make_source_result_t<DownstreamManager, Ts...> caf::scheduled_actor::make_source | ( | std::tuple< Ts...> | xs, |
Init | init, | ||
Pull | pull, | ||
Done | done, | ||
Finalize | fin = {} , |
||
policy::arg< DownstreamManager > | = {} |
||
) |
Creates a new stream source from given arguments.
xs | User-defined handshake payload. |
init | Function object for initializing the state of the source. |
pull | Generator function object for producing downstream messages. |
done | Predicate returning true when generator is done. |
fin | Cleanup handler. |
stream_manager
and the output slot.
|
virtual |
Returns a factory for proxies created and managed by this actor or nullptr
.
Reimplemented in caf::io::basp_broker.
Finishes execution of this actor after any currently running message handler is done.
This member function clears the behavior stack of the running actor and invokes on_exit()
. The actors does not finish execution if the implementation of on_exit()
sets a new behavior. When setting a new behavior in on_exit()
, one has to make sure to not produce an infinite recursion.
If on_exit()
did not set a new behavior, the actor sends an exit message to all of its linked actors, sets its state to exited and finishes execution.
In case this actor uses the blocking API, this member function unwinds the stack by throwing an actor_exited
exception.
|
overridevirtual |
Resume any pending computation until it is either finished or needs to be re-scheduled later.
Implements caf::resumable.
Reimplemented in caf::io::abstract_broker, and caf::io::basp_broker.
void caf::scheduled_actor::set_exception_handler | ( | exception_handler | fun | ) |
Sets a custom exception handler for this actor.
If multiple handlers are defined, only the functor that was added last is being executed.
std::enable_if< std::is_convertible< F, std::function<error (std::exception_ptr&)> >::value >::type caf::scheduled_actor::set_exception_handler | ( | F | f | ) |
Sets a custom exception handler for this actor.
If multiple handlers are defined, only the functor that was added last is being executed.
|
overridevirtual |
Returns a subtype hint for this object.
This allows an execution unit to limit processing to a specific set of resumables and delegate other subtypes to dedicated workers.
Reimplemented from caf::resumable.
Reimplemented in caf::io::abstract_broker.
|
related |
Default handler function that simply drops messages.
|
related |
Default handler function that prints messages message via aout
and drops them afterwards.
|
related |
Default handler function that sends the message back to the sender.
|
related |
Default handler function that sends the message back to the sender and then quits.
|
protected |
Stores stream managers for pending streams, i.e., streams that have not yet received an ACK.