28 zsock_signal(pPipe, 0);
29 ActorZmq * pActor = static_cast<Salsa::ActorZmq *>(pArg);
32 SPD_TRACE(
"SalsaActorFn::init() <-");
34 if ((ret = pActor->
init())) {
35 SPD_ERROR(
"init() failed! [{}]", ret);
38 SPD_TRACE(
"SalsaActorFn::init()->");
41 SPD_TRACE(
"SalsaActorFn::exec() <-");
42 if ((ret = pActor->
exec())) {
43 SPD_ERROR(
"exec() failed! [{}]", ret);
46 SPD_TRACE(
"SalsaActorFn::exec() ->");
49 SPD_TRACE(
"SalsaActorFn::finish() <-");
50 if ((ret = pActor->
finish())) {
51 SPD_ERROR(
"finish() failed! [{}]", ret);
54 SPD_TRACE(
"SalsaActorFn::finish() ->");
61 zsock_signal(pPipe, 0);
67 zmsg_t * pReceived = zmsg_recv(pPipe);
69 SPD_WARN(
"PA: pReceived == <nullptr> (Exec interrupted)");
74 zframe_t * pFrame = zmsg_first(pReceived);
77 if (zframe_streq(pFrame,
"$TERM")) {
78 SPD_TRACE(
"PA: Terminate received");
79 zmsg_destroy(&pReceived);
84 char * pCommand = zframe_strdup(pFrame);
85 SPD_TRACE(
"PA: got Command [{}]", pCommand);
86 pFrame = zmsg_next(pReceived);
89 char * pUID = zframe_strdup(pFrame);
90 SPD_TRACE(
"PA: got UID [{}]", pUID);
91 pFrame = zmsg_next(pReceived);
94 char * pGID = zframe_strdup(pFrame);
95 SPD_TRACE(
"PA: got GID [{}]", pGID);
96 pFrame = zmsg_next(pReceived);
99 char * pWorker = zframe_strdup(pFrame);
100 SPD_TRACE(
"PA: got Woker [{}]", pWorker);
101 pFrame = zmsg_next(pReceived);
104 char * pUpsream = zframe_strdup(pFrame);
105 SPD_TRACE(
"PA: got Upstream [{}]", pUpsream);
106 pFrame = zmsg_next(pReceived);
109 char * pClient = zframe_strdup(pFrame);
110 SPD_TRACE(
"PA: got Client [{}]", pClient);
111 pFrame = zmsg_next(pReceived);
113 std::string pMessage_str;
114 std::string pLoop_str;
119 char * pLoop = zframe_strdup(pFrame);
122 SPD_TRACE(
"PA: got str logs [{}]", pLoop);
124 std::string pMessage_str;
125 if (pLoop_str ==
"logs") {
126 while ((pFrame = zmsg_next(pReceived)) !=
nullptr) {
127 char * pMessage = zframe_strdup(pFrame);
128 pMessage_str = pMessage;
129 if (pMessage_str ==
"envs") {
133 SPD_TRACE(
"PA: Adding log target [{}]", pMessage);
135 zstr_free(&pMessage);
143 std::vector<std::string> envs;
144 if (pLoop_str ==
"envs") {
145 while ((pFrame = zmsg_next(pReceived)) !=
nullptr) {
146 char * pMessage = zframe_strdup(pFrame);
147 SPD_TRACE(
"PA: Adding env [{}]", pMessage);
148 envs.push_back(pMessage);
149 zstr_free(&pMessage);
152 char * envp[envs.size()];
154 for (
auto s : envs) {
155 char * cstr =
new char[s.length() + 1];
156 strcpy(cstr, s.c_str());
165 SPD_TRACE(
"PA: Destroying message [{}]", static_cast<void *>(pReceived));
166 zmsg_destroy(&pReceived);
170 SPD_TRACE(
"PA: Creating logger");
172 log.
spd()->info(
"Running [{}]", pCommand);
174 SPD_TRACE(
"PA: Waiting for pipes...");
176 if (pipe2(pipefd, O_NONBLOCK)) {
177 SPD_ERROR(
"FAILED to receive pipes!");
179 SPD_TRACE(
"PA: Got pipes [{}, {}]", pipefd[0], pipefd[1]);
188 SPD_TRACE(
"PA Child: uid [{}]->[{}] guid [{}]->[{}]", getuid(), pUID, getgid(), pGID);
189 if (setgid(atoi(pGID)) == -1) {
190 SPD_ERROR(
"Problem setting GUI to process !!! ");
193 if (setuid(atoi(pUID)) == -1) {
194 SPD_ERROR(
"Problem setting UID to process !!! ");
198 SPD_TRACE(
"PA Child: uid [{}] guid [{}]", getuid(), getgid());
201 SPD_TRACE(
"PA Child: Running command [{}]", pCommand);
203 unsigned int iCount = 0;
204 char ** ppCommand =
nullptr;
205 char * tmp = std::strtok(pCommand,
" ");
207 SPD_TRACE(
"PA Child: Tokenizing");
210 ppCommand = static_cast<char **>(realloc(ppCommand, (iCount + 2) *
sizeof(
char **)));
211 ppCommand[iCount++] = strdup(tmp);
212 tmp = std::strtok(
nullptr,
" ");
214 ppCommand[iCount] =
nullptr;
219 SPD_TRACE(
"PA Child: Configuring pipes");
224 dup2(pipefd[1], STDOUT_FILENO);
225 dup2(pipefd[1], STDERR_FILENO);
228 if (execvpe(ppCommand[0], ppCommand, envp) == -1) {
238 SPD_TRACE(
"PA Parent: Sending PID [{}] to parent", pid);
240 zmsg_t * pTx = zmsg_new();
241 zmsg_addstr(pTx,
"$PID");
242 zmsg_addstrf(pTx,
"%d", pid);
243 zmsg_addstr(pTx, pUpsream);
244 zmsg_addstr(pTx, pClient);
245 zsock_send(pPipe,
"m", pTx);
255 SPD_TRACE(
"PA Parent: Running command...");
258 waitpid(pid, &stat, WUNTRACED);
259 if (WIFEXITED(stat) || WIFSIGNALED(stat)) {
260 zstr_sendf(pWatcherActor,
"$EXIT");
265 zactor_destroy(&pWatcherActor);
268 int rc = WEXITSTATUS(stat);
270 if (stat == 9) rc = 137;
272 SPD_TRACE(
"PA Parent: Exit [{}] rc [{}]", stat, rc);
275 zmsg_t * pTx = zmsg_new();
276 zmsg_addstr(pTx,
"$EXIT");
277 zmsg_addstrf(pTx,
"%d", rc);
278 zmsg_addstr(pTx, pWorker);
279 zmsg_addstr(pTx, pUpsream);
280 zmsg_addstr(pTx, pClient);
281 zsock_send(pPipe,
"m", pTx);
284 log.
spd()->info(
"Process exited with status [{}]", stat);
288 SPD_ERROR(
"PA Parent: fork() failure!");
290 zmsg_t * pTx = zmsg_new();
291 zmsg_addstr(pTx,
"$FORKFAIL");
292 zsock_send(pPipe,
"m", pTx);
307 SPD_TRACE(
"PA: Terminating persistent actor");
313 zsock_signal(pPipe, 0);
315 Log & commandLogger = *(static_cast<Log *>(pLogger));
322 int fd = commandLogger.
fd();
323 const int LIMIT = PIPE_BUF;
324 char buffer[LIMIT + 1];
325 std::memset(buffer, 0, LIMIT + 1);
327 zpoller_t * pPoller = zpoller_new(
nullptr);
328 zpoller_add(pPoller, pPipe);
329 zpoller_add(pPoller, &fd);
333 void * pRecvSock = zpoller_wait(pPoller, -1);
334 if (pRecvSock == pPipe) {
335 char * pMsg = zstr_recv(pPipe);
336 std::string recvMsg = pMsg;
338 if (recvMsg ==
"$EXIT") {
342 else if (pRecvSock == &fd) {
343 ssize_t readRet = read(fd, buffer, LIMIT);
345 if (buffer[0] !=
'\0') {
346 commandLogger.
write(buffer);
347 memset(buffer, 0,
sizeof(buffer));
353 zpoller_remove(pPoller, pPipe);
354 zpoller_remove(pPoller, &fd);
355 zpoller_destroy(&pPoller);
365 SPD_TRACE(
"ActorZmq::pipe()<-");
366 mpPipe = static_cast<zsock_t *>(pPipe);
375 SPD_TRACE(
"ActorZmq::pipe()->");
384 SPD_TRACE(
"ActorZmq::init()<-");
389 SPD_TRACE(
"ActorZmq::init()->");
399 SPD_TRACE(
"ActorZmq::exec()<-");
406 SPD_WARN(
"ActorZmq::exec() : Other socket from ActorZmq class");
411 SPD_TRACE(
"ActorZmq::exec()->");
421 SPD_TRACE(
"ActorZmq::finish()<-");
422 SPD_TRACE(
"ActorZmq::finish()->");
432 SPD_ERROR(
"Poller is nullptr!");
437 SPD_TRACE(
"ActorZmq::exec(): pEvent [{}] mpPipe [{}]", static_cast<void *>(pEvent), static_cast<void *>(
mpPipe));
440 zmsg_t * pMsg = zmsg_recv(
mpPipe);
445 char * pCommand = zmsg_popstr(pMsg);
447 if (streq(pCommand,
"$TERM")) {
448 SPD_TRACE(
"ActorZmq::exec(): received $TERM");
452 SPD_ERROR(
"ActorZmq::exec(): invalid message to actor msg: [{}]", pCommand);
455 zstr_free(&pCommand);
459 SPD_TRACE(
"ActorZmq::exec(): Poller expired timeout [{}]...",
mTimeout);
462 SPD_TRACE(
"ActorZmq::exec(): Poller terminated ...");
PollerZmq * mpPoller
Internal poller.
std::shared_ptr< spdlog::logger > spd()
Get SPDLOG logger handle.
virtual int init()
First function.
virtual void pipe(void *pipe)
Setter for pipe.
static void signalHandler(int signalNumber)
Setter salsa interruption.
zpoller_t * poller() const
virtual int finish()
Last function.
void fd(int newFD)
Set FD of pipe to watch.
bool mTerminated
Flag if actor should be terminated.
zpoller_t * poller() const
Returns Poller.
static void actorProcwaitSupport_(zsock_t *pipe, void *argv)
Support actor method (used for PID waiting)
bool terminated() const
Flag if actor should be terminated.
int write(char const *)
Write to logger.
ZeroMQ implementation of salsa actor class.
zsock_t * mpPipe
Zmq pipe socket.
virtual void add(SocketZyre *pSocket)
static void SalsaActorFn(zsock_t *pPipe, void *pArgv)
virtual int exec()
Main function.
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
int create()
Create SPDLOG loger.
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
int mTimeout
Poller timeout.
PollerZmq * pollerZmq() const
virtual void * wait(int timeout=-1)
Waiting for socket.
int add(std::string)
Add output sink (file, console, zmq) for SPDLOG.