salsa  0.3.0
ActorZmq.cc
1 #include "ActorZmq.hh"
2 
3 namespace Salsa {
5 {
9 
10  mpPoller = new PollerZmq();
11 }
13 {
17 
18  // Why no smart pointer? Because this one is fully managed within current class.
19  delete mpPoller;
20 }
21 
22 void ActorZmq::SalsaActorFn(zsock_t * pPipe, void * pArg)
23 {
27 
28  zsock_signal(pPipe, 0);
29  ActorZmq * pActor = static_cast<Salsa::ActorZmq *>(pArg);
30  pActor->pipe(pPipe);
31 
32  SPD_TRACE("SalsaActorFn::init()");
33  int ret = 0;
34  if ((ret = pActor->init())) {
35  SPD_ERROR("init() failed! [{}]", ret);
36  return;
37  }
38 
40  SPD_TRACE("SalsaActorFn::exec()");
41  if ((ret = pActor->exec())) {
42  SPD_ERROR("exec() failed! [{}]", ret);
43  return;
44  }
45  }
46 
47  SPD_TRACE("SalsaActorFn::finish()");
48  if ((ret = pActor->finish())) {
49  SPD_ERROR("finish() failed! [{}]", ret);
50  return;
51  }
52 }
53 
54 void ActorZmq::SalsaActorForkFn(zsock_t * pPipe, void *)
55 {
56  // _/(;_;)\_
57  // AT+OK
58  zsock_signal(pPipe, 0);
59 
60  pid_t pid = 0;
61 
62  // PA will be running indefinitely, until interrupted
63  while (true) {
64  zmsg_t * pReceived = zmsg_recv(pPipe);
65  if (!pReceived) {
66  SPD_WARN("PA: pReceived == <nullptr> (Exec interrupted)");
67  break;
68  }
69 
70  // read first frame
71  zframe_t * pFrame = zmsg_first(pReceived);
72 
73  // Terminate on signal
74  if (zframe_streq(pFrame, "$TERM")) {
75  SPD_TRACE("PA: Terminate received");
76  zmsg_destroy(&pReceived);
77  break; // TERMINATE persistent actor
78  }
79  else {
80  // GET command from message
81  char * pCommand = zframe_strdup(pFrame);
82  SPD_TRACE("PA: got Command [{}]", pCommand);
83  pFrame = zmsg_next(pReceived);
84 
85  // GET UID from message
86  char * pUid = zframe_strdup(pFrame);
87  SPD_TRACE("PA: got UID [{}]", pUid);
88  pFrame = zmsg_next(pReceived);
89 
90  // GET GID from message
91  char * pGid = zframe_strdup(pFrame);
92  SPD_TRACE("PA: got GID [{}]", pGid);
93  pFrame = zmsg_next(pReceived);
94 
95  // GET Upsream uuid from message
96  char * pWorker = zframe_strdup(pFrame);
97  SPD_TRACE("PA: got Woker [{}]", pWorker);
98  pFrame = zmsg_next(pReceived);
99 
100  // GET Upsream uuid from message
101  char * pUpsream = zframe_strdup(pFrame);
102  SPD_TRACE("PA: got Upstream [{}]", pUpsream);
103  pFrame = zmsg_next(pReceived);
104 
105  // GET Client uuid from message
106  char * pClient = zframe_strdup(pFrame);
107  SPD_TRACE("PA: got Client [{}]", pClient);
108 
109  // GET targets from message
110  Salsa::Log log;
111  while ((pFrame = zmsg_next(pReceived)) != nullptr) {
112  char * pMessage = zframe_strdup(pFrame);
113  SPD_TRACE("PA: Adding log target [{}]", pMessage);
114  log.add(pMessage);
115  zstr_free(&pMessage);
116  }
118  // if (log.empty())
119  // log.add("");
120 
121  // Destroy message and go on with your life
122  SPD_TRACE("PA: Destroying message [{}]", static_cast<void *>(pReceived));
123  zmsg_destroy(&pReceived);
124 
125  // Initialization DONE ---------------------------------------------
126 
127  SPD_TRACE("PA: Creating logger");
128  log.create();
129  log.spd()->info("Running [{}]", pCommand);
130 
131  SPD_TRACE("PA: Waiting for pipes...");
132  int pipefd[2];
133  if (pipe2(pipefd, O_NONBLOCK)) {
134  SPD_ERROR("FAILED to receive pipes!"); // TODO Inform manager about pipe failure ?
135  }
136  SPD_TRACE("PA: Got pipes [{}, {}]", pipefd[0], pipefd[1]);
137 
138  // = = = = = = = = = = FORK = = = = = = = = = =
139  pid = fork();
140  if (pid == 0) {
141 
142  // TODO this needs to be improved if we cannot set UID
143  // TODO Check if uid is equal to process uid (in non-root case)
144  if (getuid() == 0) {
145  SPD_TRACE("PA Child: uid [{}]->[{}] guid [{}]->[{}]", getuid(), pUid, getgid(), pGid);
146  if (setgid(atoi(pGid)) == -1) {
147  SPD_ERROR("Problem setting GUI to process !!! ");
148  return;
149  }
150  if (setuid(atoi(pUid)) == -1) {
151  SPD_ERROR("Problem setting UID to process !!! ");
152  return;
153  }
154 
155  SPD_TRACE("PA Child: uid [{}] guid [{}]", getuid(), getgid());
156  }
157 
158  SPD_TRACE("PA Child: Running command [{}]", pCommand);
159  // FORK Child handler
160  unsigned int iCount = 0;
161  char ** ppCommand = nullptr;
162  char * tmp = std::strtok(pCommand, " ");
163 
164  SPD_TRACE("PA Child: Tokenizing");
165  do {
166  // iCount + 2 because 1) iCounter an iterator and 2) we need nullptr at the end
167  ppCommand = static_cast<char **>(realloc(ppCommand, (iCount + 2) * sizeof(char **)));
168  ppCommand[iCount++] = strdup(tmp);
169  tmp = std::strtok(nullptr, " ");
170  } while (tmp);
171  ppCommand[iCount] = nullptr;
172 
173  // SPD_TRACE("PA Child: Sleeping for 100ms");
174  // std::this_thread::sleep_for(std::chrono::milliseconds(100));
175 
176  SPD_TRACE("PA Child: Configuring pipes");
177 
178  // After these lines you'll be unable to log anything to console, so don't even
179  // try. It's literally a waste of time.
180  close(pipefd[0]);
181  dup2(pipefd[1], STDOUT_FILENO);
182  dup2(pipefd[1], STDERR_FILENO);
183  close(pipefd[1]);
184 
185  if (execvp(ppCommand[0], ppCommand) == -1) {
186  int const err = errno;
187  SPD_ERROR("PA failed to execute command! Error: [{}]", strerror(err));
188  exit(127);
189  }
190  }
191  else if (pid > 0) {
192  // FORK Parent handler
193  // Send PID to parent
194  SPD_TRACE("PA Parent: Sending PID [{}] to parent", pid);
195  {
196  zmsg_t * pTx = zmsg_new();
197  zmsg_addstr(pTx, "$PID");
198  zmsg_addstrf(pTx, "%d", pid);
199  zmsg_addstr(pTx, pUpsream);
200  zmsg_addstr(pTx, pClient);
201  zsock_send(pPipe, "m", pTx);
202  zmsg_destroy(&pTx);
203  }
204 
205  int stat = -1;
206  close(pipefd[1]);
207 
208  log.fd(pipefd[0]);
209  zactor_t * pWatcherActor = zactor_new(actorProcwaitSupport_, &log);
210 
211  SPD_TRACE("PA Parent: Running command...");
212  // Read from pipe until child dies
213  while (true) {
214  waitpid(pid, &stat, WUNTRACED);
215  if (WIFEXITED(stat) || WIFSIGNALED(stat)) {
216  zstr_sendf(pWatcherActor, "$EXIT");
217  break;
218  }
219  }
220 
221  zactor_destroy(&pWatcherActor);
222 
223  close(pipefd[0]);
224  int rc = WEXITSTATUS(stat);
225  // In case of kill -9 : returning 137
226  if (stat == 9) rc = 137;
227 
228  SPD_TRACE("PA Parent: Exit [{}] rc [{}]", stat, rc);
229  {
230 
231  zmsg_t * pTx = zmsg_new();
232  zmsg_addstr(pTx, "$EXIT");
233  zmsg_addstrf(pTx, "%d", rc);
234  zmsg_addstr(pTx, pWorker);
235  zmsg_addstr(pTx, pUpsream);
236  zmsg_addstr(pTx, pClient);
237  zsock_send(pPipe, "m", pTx);
238  zmsg_destroy(&pTx);
239  }
240  log.spd()->info("Process exited with status [{}]", stat);
241  }
242  else {
243  // FORK Failed handler
244  SPD_ERROR("PA Parent: fork() failure!");
245  {
246  zmsg_t * pTx = zmsg_new();
247  zmsg_addstr(pTx, "$FORKFAIL");
248  zsock_send(pPipe, "m", pTx);
249  zmsg_destroy(&pTx);
250  }
251  } // END FORK handling
252 
253  free(pCommand);
254  free(pUid);
255  free(pGid);
256  free(pWorker);
257  free(pUpsream);
258  free(pClient);
259 
260  } // END execute command
261  } // END while (true)
262 
263  SPD_TRACE("PA: Terminating persistent actor");
264  return;
265 }
266 
267 void ActorZmq::actorProcwaitSupport_(zsock_t * pPipe, void * pLogger)
268 {
269  zsock_signal(pPipe, 0);
270 
271  Log & commandLogger = *(static_cast<Log *>(pLogger));
272  // 3) 2)1)
273  // Since this is kind of hard to read:
274  // 1) Cast pLogger to Log *
275  // 2) Get its value
276  // 3) Set reference to it
277 
278  int fd = commandLogger.fd();
279  const int LIMIT = PIPE_BUF;
280  char buffer[LIMIT + 1];
281  std::memset(buffer, 0, LIMIT + 1);
282 
283  zpoller_t * pPoller = zpoller_new(nullptr);
284  zpoller_add(pPoller, pPipe);
285  zpoller_add(pPoller, &fd);
286 
287  while (true) {
288  // Possible death race condition... I'm looking at you Valgrind
289  void * pRecvSock = zpoller_wait(pPoller, -1);
290  if (pRecvSock == pPipe) {
291  char * pMsg = zstr_recv(pPipe);
292  std::string recvMsg = pMsg;
293  free(pMsg);
294  if (recvMsg == "$EXIT") {
295  break;
296  }
297  }
298  else if (pRecvSock == &fd) {
299  ssize_t readRet = read(fd, buffer, LIMIT);
300  if (readRet > 0) {
301  if (buffer[0] != '\0') {
302  commandLogger.write(buffer);
303  memset(buffer, 0, sizeof(buffer));
304  }
305  }
306  }
307  }
308 
309  zpoller_remove(pPoller, pPipe);
310  zpoller_remove(pPoller, &fd);
311  zpoller_destroy(&pPoller);
312  return;
313 }
314 
315 void ActorZmq::pipe(void * pPipe)
316 {
320 
321  SPD_TRACE("ActorZmq::pipe()<-");
322  mpPipe = static_cast<zsock_t *>(pPipe);
323 
324  if (!mpPoller) {
325  mpPoller = new PollerZmq();
326  }
327 
328  if (mpPipe) {
329  mpPoller->add(mpPipe);
330  }
331  SPD_TRACE("ActorZmq::pipe()->");
332 }
333 
335 {
339 
340  SPD_TRACE("ActorZmq::init()<-");
341  SPD_TRACE("ActorZmq::init()->");
342  return 0;
343 }
344 
346 {
350 
351  SPD_TRACE("ActorZmq::exec()<-");
352 
353  void * pEvent;
354  while (!mTerminated && !Salsa::Actor::interrupted()) {
355  pEvent = wait();
356  if (pEvent) {
357  // handle other socket
358  SPD_WARN("ActorZmq::exec() : Other socket from ActorZmq class");
359  }
360  }
361 
362  SPD_TRACE("ActorZmq::exec() : Salsa::interrupted() [{}]", Salsa::Actor::interrupted());
363  SPD_TRACE("ActorZmq::exec()->");
364  return 0;
365 }
366 
368 {
372 
373  SPD_TRACE("ActorZmq::finish()<-");
374  SPD_TRACE("ActorZmq::finish()->");
375  return 0;
376 }
377 
379 {
383  if (!mpPoller) {
384  SPD_ERROR("Poller is nullptr!");
385  return nullptr;
386  }
387 
388  void * pEvent = mpPoller->wait(mTimeout);
389  SPD_TRACE("ActorZmq::exec(): pEvent [{}] mpPipe [{}]", static_cast<void *>(pEvent), static_cast<void *>(mpPipe));
390 
391  if (mpPipe && pEvent == mpPipe) {
392  zmsg_t * pMsg = zmsg_recv(mpPipe);
393  if (!pMsg) {
394  return nullptr;
395  }
396 
397  char * pCommand = zmsg_popstr(pMsg);
398  zmsg_destroy(&pMsg);
399  if (streq(pCommand, "$TERM")) {
400  SPD_TRACE("ActorZmq::exec(): received $TERM");
401  mTerminated = true;
402  }
403  else {
404  SPD_ERROR("ActorZmq::exec(): invalid message to actor msg: [{}]", pCommand);
405  assert(false); // We should __not__ use assert here, because it's only used to debug...
406  }
407  zstr_free(&pCommand);
408  }
409  else {
410  if (zpoller_expired(mpPoller->poller())) {
411  SPD_TRACE("ActorZmq::exec(): Poller expired timeout [{}]...", mTimeout);
412  }
413  else if (zpoller_terminated(mpPoller->poller())) {
414  SPD_TRACE("ActorZmq::exec(): Poller terminated ...");
415  mTerminated = true;
416  }
417  else {
418  return pEvent;
419  }
420  }
421 
422  return pEvent;
423 }
424 
425 zpoller_t * ActorZmq::poller() const
426 {
430  return mpPoller->poller();
431 }
433 {
437  return mpPoller;
438 }
439 } // namespace Salsa
PollerZmq * mpPoller
Internal poller.
Definition: ActorZmq.hh:41
std::shared_ptr< spdlog::logger > spd()
Get SPDLOG logger handle.
Definition: Log.hh:37
virtual int init()
First function.
Definition: ActorZmq.cc:334
virtual void pipe(void *pipe)
Setter for pipe.
Definition: ActorZmq.cc:315
virtual ~ActorZmq()
Definition: ActorZmq.cc:12
zpoller_t * poller() const
Definition: ActorZmq.cc:425
virtual int finish()
Last function.
Definition: ActorZmq.cc:367
void fd(int newFD)
Set FD of pipe to watch.
Definition: Log.hh:42
bool mTerminated
Flag if actor should be terminated.
Definition: ActorZmq.hh:42
zpoller_t * poller() const
Returns Poller.
Definition: PollerZmq.hh:27
static void actorProcwaitSupport_(zsock_t *pipe, void *argv)
Support actor method (used for PID waiting)
Definition: ActorZmq.cc:267
int write(char const *)
Write to logger.
Definition: Log.cc:49
virtual void * wait()
Definition: ActorZmq.cc:378
salsa node class
Definition: PollerZmq.hh:16
ZeroMQ implementation of salsa actor class
Definition: ActorZmq.hh:19
zsock_t * mpPipe
Zmq pipe socket.
Definition: ActorZmq.hh:40
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
Definition: Actor.cc:2
Base salsa actor class
Definition: Actor.hh:17
static void SalsaActorFn(zsock_t *pPipe, void *pArgv)
Definition: ActorZmq.cc:22
virtual int exec()
Main function.
Definition: ActorZmq.cc:345
Definition: Log.hh:17
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition: Actor.hh:35
int create()
Create SPDLOG loger.
Definition: Log.cc:31
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
Definition: ActorZmq.cc:54
int mTimeout
Poller timeout.
Definition: ActorZmq.hh:43
PollerZmq * pollerZmq() const
Definition: ActorZmq.cc:432
virtual void * wait(int timeout=-1)
Waiting for socket.
Definition: PollerZmq.cc:56
int add(std::string)
Add output sink (file, console, zmq) for SPDLOG.
Definition: Log.cc:10