salsa  0.4.0
NodeManager.cc
1 #include <json/json.h>
2 #include "NodeManager.hh"
3 namespace Salsa {
4 NodeManager::NodeManager() : Object(), mFinishedJobTimeout(24 * 3600)
5 {
9  srand(static_cast<uint32_t>(time(nullptr))); // seed with time since epoch
10 }
11 
13 {
17 
18  for (auto job : mJobs) {
19  if (mpTaskPool) {
20  mpTaskPool->terminateJob(job.second);
21  }
22  delete job.second;
23  }
24  // terminateJobAll();
25  mJobs.clear();
26  delete mpTaskPool;
27  delete mpPublisher;
28 }
29 
30 void NodeManager::print(std::string /*opt*/) const
31 {
35 
36  SPD_TRACE("mFeeders [{}] mConsumers [{}] mWorkers [{}] mJobs [{}] ", mFeeders.size(), mConsumers.size(),
37  mWorkers.size(), mJobs.size());
38 
39  if (mJobs.size() > 0) {
40  SPD_DEBUG("= JOBS =======================");
41  for (auto j : mJobs) {
42  j.second->print();
43  }
44  SPD_DEBUG("==============================");
45  }
46  else {
47  SPD_DEBUG("= NO JOBS ====================");
48  }
49 
50  if (mpTaskPool) {
51  mpTaskPool->print();
52  }
53 }
54 
55 void NodeManager::addConsumer(std::string uuid, std::shared_ptr<Socket> pSocket)
56 {
60 
61  mConsumers.emplace(uuid, std::make_shared<Consumer>(uuid, pSocket, this));
62 }
63 
64 void NodeManager::addFeeder(std::string uuid, std::shared_ptr<Socket> pSocket)
65 {
69 
70  mFeeders.emplace(uuid, std::make_shared<Feeder>(uuid, pSocket, this));
71 }
72 
73 void NodeManager::addWorker(std::string uuid, std::shared_ptr<Socket> pSocket)
74 {
78 
79  mWorkers.emplace(uuid, std::make_shared<Worker>(uuid, pSocket, this));
80 }
81 
82 Socket * NodeManager::onEnter(std::string self, std::string fromType, Message * msg, std::vector<std::string> & out)
83 {
87 
88  SPD_TRACE("NodeManager::onEnter self [{}] from [{}] type [{}] msg [{}]", self, msg->uuid(), fromType,
89  static_cast<void *>(msg));
90 
91  // TODO Implement map<std::string /* self */, Node::ENodeType?>
92  auto pFeeder = feeder(self);
93  if (pFeeder) {
94  SPD_DEBUG("::onEnter FEEDER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
95  SPD_INFO("FEEDER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
96  if (fromType == "CONSUMER") {
97  pFeeder->addClient(msg->uuid(), fromType);
98  pFeeder->onEnter(msg, out, fromType);
99  }
100  else if (fromType == "WORKER") {
101  pFeeder->addClient(msg->uuid(), fromType);
102  pFeeder->onEnter(msg, out, fromType);
103  }
104  else if (fromType == "DISCOVERY") {
105  // We fully ignoring it
106  SPD_DEBUG("DISCOVERY is here");
107  }
108  else {
109  pFeeder->addOther(msg->uuid(), fromType);
110  }
111 
112  return pFeeder->pipe().get();
113  }
114 
115  auto pConsumer = consumer(self);
116  if (pConsumer) {
117  SPD_DEBUG("::onEnter CONSUMER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
118  SPD_INFO("CONSUMER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
119  if (fromType == "FEEDER") {
120  pConsumer->addClient(msg->uuid(), fromType);
121  pConsumer->onEnter(msg, out, fromType);
122  }
123  else {
124  pConsumer->addOther(msg->uuid(), fromType);
125  }
126 
127  return pConsumer->pipe().get();
128  }
129 
130  auto pWorker = worker(self);
131  if (pWorker) {
132  SPD_DEBUG("::onEnter WORKER [{}] has client on network [{}] type [{}]", self, msg->uuid(), fromType);
133  SPD_INFO("WORKER [{}] <= [{}] [{}]", self, msg->uuid(), fromType);
134  if (fromType == "FEEDER") {
135  pWorker->addClient(msg->uuid(), fromType);
136  pWorker->onEnter(msg, out, fromType);
137  }
138  else {
139  pWorker->addOther(msg->uuid(), fromType);
140  }
141 
142  return pWorker->pipe().get();
143  }
144 
145  return nullptr;
146 }
147 Socket * NodeManager::onExit(std::string self, Message * msg, std::vector<std::string> & out)
148 {
152 
153  SPD_TRACE("NodeManager::onExit self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
154 
155  auto pWorker = worker(self);
156  if (pWorker) {
157  SPD_DEBUG("::onExit WORKER [{}] client on network [{}] has left", self, msg->uuid());
158  SPD_INFO("WORKER [{}] => [{}]", self, msg->uuid());
159  pWorker->onExit(msg, out);
160  pWorker->removeClient(msg->uuid());
161  return pWorker->pipe().get();
162  }
163 
164  auto pFeeder = feeder(self);
165  if (pFeeder) {
166  SPD_DEBUG("::onExit FEEDER [{}] client on network [{}] has left", self, msg->uuid());
167  SPD_INFO("FEEDER [{}] => [{}]", self, msg->uuid());
168  pFeeder->onExit(msg, out);
169  pFeeder->removeClient(msg->uuid());
170  return pFeeder->pipe().get();
171  }
172 
173  auto pConsumer = consumer(self);
174  if (pConsumer) {
175  SPD_DEBUG("::onExit CONSUMER [{}] client on network [{}] has left", self, msg->uuid());
176  SPD_INFO("CONSUMER [{}] => [{}]", self, msg->uuid());
177  pConsumer->onExit(msg, out);
178  pConsumer->removeClient(msg->uuid());
179  return pConsumer->pipe().get();
180  }
181 
182  return nullptr;
183 }
184 
185 Socket * NodeManager::onWhisper(std::string self, Message * msg, std::vector<std::string> & out)
186 {
190 
191  SPD_TRACE("NodeManager::onWhisper self [{}] from [{}] msg [{}]", self, msg->uuid(), static_cast<void *>(msg));
192 
193  auto pFeeder = feeder(self);
194  if (pFeeder) {
195  SPD_TRACE("::onWhisper() FEEDER [{}] from [{}] has msg", self, msg->uuid());
196  pFeeder->onWhisper(msg, out);
197  return pFeeder->pipe().get();
198  }
199 
200  auto pConsumer = consumer(self);
201  if (pConsumer) {
202  SPD_TRACE("::onWhisper() CONSUMER [{}] from [{}] has msg", self, msg->uuid());
203  pConsumer->onWhisper(msg, out);
204  return pConsumer->pipe().get();
205  }
206 
207  auto pWorker = worker(self);
208  if (pWorker) {
209  SPD_TRACE("::onWhisper() WORKER [{}] from [{}] has msg", self, msg->uuid());
210  pWorker->onWhisper(msg, out);
211  return pWorker->pipe().get();
212  }
213 
214  return nullptr;
215 }
216 
217 bool NodeManager::sendWhisper(Socket * /*s*/, std::string /*to*/, std::vector<std::string> & /*v*/)
218 {
222 
223  return true;
224 }
225 
226 void NodeManager::addTask(TaskInfo * pTaskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType type)
227 {
231 
232  Salsa::Job * pJob = nullptr;
233  auto search = mJobs.find(pTaskInfo->jobid());
234  if (search != mJobs.end()) {
235  // if pJob was found
236  pJob = search->second;
237  }
238  else {
239  // if pJob was not found
240  pJob = new Salsa::Job(pTaskInfo->jobid());
241  pJob->consumer(cuuid);
242  pJob->feeder(fuuid);
243  mJobs.insert(std::make_pair(pTaskInfo->jobid(), pJob));
244  mActiveJobs.push_back(pTaskInfo->jobid());
245  // TODO : now we need to tell all feeders that theat they should subscribe to workers
246  SPD_TRACE("Looping feeders");
247  for (auto feeder : mFeeders) {
249  SPD_TRACE("Subscribe to feeder [{}]", feeder.first);
250  feeder.second->subscribe(pTaskInfo->jobid());
251  }
252  }
253 
254  SPD_TRACE("::addTask from [{}] with task id [{}]", pTaskInfo->jobid(), pTaskInfo->taskid());
255  pJob->addTask(pTaskInfo->taskid(), pTaskInfo, type);
256 }
257 
259 {
263  TaskInfo * pTaskInfo = nullptr;
264 
265  SPD_TRACE("mActiveJobs.size() [{}]", mActiveJobs.size());
266  while (mActiveJobs.size() > 0 && pTaskInfo == nullptr) {
267  size_t index = static_cast<size_t>(rand()) % mActiveJobs.size();
268  std::string jobstr = mActiveJobs[index];
269  auto iJob = mJobs.find(jobstr);
270  if (iJob != mJobs.end()) {
271  pTaskInfo = iJob->second->nextTask();
272  if (pTaskInfo) {
273  SPD_TRACE("getNextTask FEEDER [{}] JOB [{}:{}]", iJob->first, pTaskInfo->jobid(), pTaskInfo->taskid());
274  return pTaskInfo;
275  }
276  }
277 
278  // removing jobstring from mActiveJobs
279  mActiveJobs.erase(std::remove(begin(mActiveJobs), end(mActiveJobs), jobstr), end(mActiveJobs));
280  }
281 
282  SPD_TRACE("::getNextTask No pTaskInfo found");
283  return nullptr;
284 }
285 
286 void NodeManager::resultTask(TaskInfo * pTask)
287 {
291 
292  Job * pJob = job(pTask->jobid());
293  if (pJob == nullptr) {
294  delete pTask;
295  return;
296  }
297 
298  SPD_TRACE("TASK ENDED JOB [{}:{}]", pTask->jobid(), pTask->taskid());
299  // search->second->moveTask(pTask->taskid(), pTask, Salsa::Job::running, Salsa::Job::done);
300 
301  // If job has no consumer we end (assuming that it is SUBMITTER)
302  if (pJob->consumer().empty()) {
303  // TODO : Fix done and failed
304  auto sourceQueue = (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) ? Job::assigned : Job::running;
305  if (pTask->returncode() == 0) {
306  pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::done);
307  }
308  else {
309  pJob->moveTask(pTask->taskid(), sourceQueue, Salsa::Job::failed);
310  }
311 
312  if (pJob->isFinished()) {
314  mFinishedJobs.push_back(pJob->uuid());
315  }
316 
317  resultTaskToExternal(pJob, pTask);
318 
319  print();
320  // TODO we need to think what to do with TaskInfo object in highest level
321  delete pTask;
322  return;
323  }
324 
325  if (pJob->isTaskInQueue(pTask->taskid(), Salsa::Job::assigned)) {
326  SPD_WARN("Task [{}] duplicate found in [assigned] queue!", pTask->taskid());
327  pJob->removeTask(pTask->taskid(), Salsa::Job::assigned);
328  }
329  else {
330  SPD_WARN("Task [{}] duplicate found in [running] queue!", pTask->taskid());
331  pJob->removeTask(pTask->taskid(), Salsa::Job::running);
332  }
333 
334  std::shared_ptr<Consumer> pConsumer = consumer(pJob->consumer());
335  std::vector<std::string> out = {"TASK_RESULT"};
336  // out.push_back("TASK_RESULT");
337  std::string payload;
338  pTask->SerializeToString(&payload);
339  out.push_back(payload);
340  uint32_t slots = nSlots();
341 
342  // delete pTask;
343 
344  // TODO only for testing, REMOVE IT later
345  // - Well, what about #ifdef DEBUG ?
346  if (getenv("SALSA_FAKE")) slots *= 10;
347 
348  if (pJob->size(Job::pending) < slots) {
349  if (pJob->haveMoreTasks()) {
350  SPD_TRACE("We are requesting new tasks [{}] haveMoreTasks [{}]", slots, pJob->haveMoreTasks());
351  out.push_back("&");
352  out.push_back("SENDTASKS");
353  out.push_back(fmt::format("{}", slots));
354  }
355  }
356 
357  sendWhisper(pConsumer->pipe().get(), pJob->feeder(), out);
358 }
359 
360 void NodeManager::terminateJob(std::string uuid)
361 {
365 
366  SPD_TRACE("Terminating job from client [{}]", uuid);
367 
368  auto iJob = mJobs.find(uuid);
369  if (iJob != mJobs.end()) {
370  if (mpTaskPool) {
371  mpTaskPool->terminateJob(iJob->second);
372  }
373 
374  for (auto f : mFeeders) {
375  f.second->terminateJob(uuid);
376  }
377 
378  mFinishedJobs.erase(std::remove(begin(mFinishedJobs), end(mFinishedJobs), uuid), end(mFinishedJobs));
379 
380  SPD_TRACE("Removing job [{}]", uuid);
381  delete iJob->second;
382  iJob->second = nullptr;
383  mJobs.erase(iJob);
384  }
385  SPD_TRACE("NodeManager::terminateJob print()");
386  print();
387 }
388 
390 {
394 
395  if (mFinishedJobs.size() == 0) return false;
396 
397  SPD_DEBUG("Checking finished jobs [{}] to be removed ...", mFinishedJobs.size());
398 
399  std::chrono::time_point<std::chrono::system_clock> curTime = std::chrono::system_clock::now();
400  uint64_t curTimeEpoch = std::chrono::duration_cast<std::chrono::seconds>(curTime.time_since_epoch()).count();
401  std::vector<std::string> cleanUUID;
402  for (auto js : mFinishedJobs) {
403  auto j = job(js);
404  if (j == nullptr) continue;
405  if (curTimeEpoch - j->timeFinished() > mFinishedJobTimeout) {
406  SPD_DEBUG("Terminating finished job. Time : diff[{}] timeout[{}]", curTimeEpoch - j->timeFinished(),
408  cleanUUID.push_back(js);
409  }
410  }
411  if (!cleanUUID.size()) return false;
412 
413  for (auto u : cleanUUID) {
414  terminateJob(u);
415  }
416 
417  return true;
418 }
419 
420 void NodeManager::terminateAllJobs(bool finishedonly)
421 {
425  if (mJobs.size() == 0) return;
426 
427  std::vector<std::string> cleanUUID;
428  if (finishedonly)
429  for (auto job : mFinishedJobs) cleanUUID.push_back(job);
430 
431  else {
432  for (auto job : mJobs) cleanUUID.push_back(job.first);
433  }
434 
435  for (auto u : cleanUUID) {
436  SPD_DEBUG("Terminating [{}]", u);
437  terminateJob(u);
438  }
439 }
440 
441 std::shared_ptr<Feeder> NodeManager::feeder(std::string uuid) const
442 {
447  auto search = mFeeders.find(uuid);
448  if (search != mFeeders.end()) {
449  return search->second;
450  }
451  else {
452  return nullptr;
453  }
454 }
455 std::shared_ptr<Consumer> NodeManager::consumer(std::string uuid) const
456 {
461  auto search = mConsumers.find(uuid);
462  if (search != mConsumers.end()) {
463  return search->second;
464  }
465  return nullptr;
466 }
467 std::shared_ptr<Worker> NodeManager::worker(std::string uuid) const
468 {
473  auto search = mWorkers.find(uuid);
474  if (search != mWorkers.end()) {
475  return search->second;
476  }
477  return nullptr;
478 }
479 
480 Job * NodeManager::job(std::string uuid)
481 {
486  auto search = mJobs.find(uuid);
487  if (search != mJobs.end()) {
488  return search->second;
489  }
490  return nullptr;
491 }
492 
494 {
498  if (mpTaskPool == nullptr) {
499  mpTaskPool = new TaskPool(this);
500  }
501 }
502 
503 bool NodeManager::handleTaskPool(void * /*p*/)
504 {
508  return false;
509 }
510 
512 {
516  return mpTaskPool;
517 }
518 
520 {
524  return mActiveJobs.size() > 0;
525 }
526 
527 void NodeManager::jobs(std::string client_uuid, std::vector<std::string> & jobs) const
528 {
532 
533  for (auto search : mJobs) {
534  if (search.second != nullptr && search.second->feeder() == client_uuid) {
535  jobs.push_back(search.first);
536  }
537  }
538 }
539 int32_t NodeManager::nSlots(double mult) const
540 {
544 
545  int32_t num = 0;
546  for (auto feeder : mFeeders) {
547  num += feeder.second->nodeInfo()->slots();
548  }
549  return num * mult;
550 }
551 
552 void NodeManager::noMoreTasks(std::string /*jobUUID*/)
553 {
557 
558  (void)(0);
559  // auto search = mJobs.find(jobUUID);
560  // if (search != mJobs.end()) {
561  // return search->second->haveMoreTasks();
562  //}
563 }
564 
566 {
570 
571  bool rc = false;
572  for (auto job : mJobs) {
573  if (job.second->Job::size(Job::pending)) {
574  // job.second->haveMoreTasks(true);
575  bool isActiveJob = false;
576  for (auto pActiveJobUUID : mActiveJobs) {
577  if (pActiveJobUUID == job.first) {
578  isActiveJob = true;
579  break;
580  }
581  }
582 
583  if (!isActiveJob) {
584  mActiveJobs.push_back(job.first);
585  }
586 
587  rc = true;
588  }
589  }
590  return rc;
591 }
592 
593 bool NodeManager::haveMoreTasks(std::string jobUUID)
594 {
598  auto found = mJobs.find(jobUUID);
599  if (found != mJobs.end()) {
600  return found->second->haveMoreTasks();
601  }
602  else {
603  return false;
604  }
605 }
606 
608 {
612  mpPublisher = pPublisher;
613 }
614 
616 {
620  return mpPublisher;
621 }
622 
623 void NodeManager::publish(std::string id, bool force) const
624 {
628  if (!mpPublisher) return;
629 
630  bool changed = false;
631  Json::Value json;
632  Json::Value & json_jobs = json["jobs"];
633  json_jobs = Json::arrayValue;
634  json["version"] =
635  fmt::format("v{}.{}.{}-{}", SALSA_VERSION_MAJOR(SALSA_VERSION), SALSA_VERSION_MINOR(SALSA_VERSION),
636  SALSA_VERSION_PATCH(SALSA_VERSION), SALSA_VERSION_RELEASE);
637  // json_node = Json::arrayValue;
638  if (mJobs.size() > 0) {
639  for (auto job : mJobs) {
640  if (job.second->changed()) changed = true;
641  }
642 
643  if (!force && !changed) return;
644 
645  for (auto job : mJobs) {
646  job.second->json(json_jobs);
647  }
648  }
649 
650  // Fill node info
651  std::string name = id;
652  auto f = feeder(id);
653  if (f) {
654  json["node"] = f->jsonValueNodeInfo();
655  if (!f->nodeInfo()->name().empty()) name = f->nodeInfo()->name();
656  }
657 
658  Json::StreamWriterBuilder wBuilder;
659  wBuilder["indentation"] = "";
660  std::string data = Json::writeString(wBuilder, json);
661 
662  // print();
663  SPD_TRACE("Publish sub [salsa:{}] id [{}] data [{}] ", id, name, data);
664  mpPublisher->publish(id, name, data);
665 
666  for (auto job : mJobs) {
667  job.second->changed(false);
668  }
669 }
670 
671 } // namespace Salsa
std::vector< std::string > mActiveJobs
List of active jobs.
Definition: NodeManager.hh:89
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:89
Base Message class.
Definition: Message.hh:15
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:52
void consumer(std::string uuid)
Definition: Job.cc:227
uint64_t mFinishedJobTimeout
Finished job timeout in seconds.
Definition: NodeManager.hh:91
void jobs(std::string clientUUID, std::vector< std::string > &jobs) const
Definition: NodeManager.cc:527
std::shared_ptr< Worker > worker(std::string uuid) const
Definition: NodeManager.cc:467
bool terminateJob(Job *pJob)
Definition: TaskPool.cc:99
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:147
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:360
std::map< std::string, std::shared_ptr< Consumer > > mConsumers
List of Consumers.
Definition: NodeManager.hh:93
virtual void addTaskSlot()
Definition: NodeManager.cc:493
virtual bool haveMoreTasks()
Definition: NodeManager.cc:565
virtual void noMoreTasks(std::string jobUUID)
Definition: NodeManager.cc:552
Job class.
Definition: Job.hh:16
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:269
bool changed() const
Returns if job info was changed.
Definition: Job.hh:68
virtual void publish(std::string id, bool force=false) const
Definition: NodeManager.cc:623
int32_t nSlots(double mult=1.0) const
Definition: NodeManager.cc:539
EQueueType
Queue types.
Definition: Job.hh:19
std::string uuid() const
returns UUID
Definition: Job.hh:28
virtual void resultTask(TaskInfo *task)
Definition: NodeManager.cc:286
void feeder(std::string uuid)
Definition: Job.cc:244
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:64
virtual std::string uuid() const =0
Returns node uuid.
Base Publisher class.
Definition: Publisher.hh:14
Job * job(std::string uuid)
Definition: NodeManager.cc:480
bool hasJobs() const
Definition: NodeManager.cc:519
void print(std::string opt="") const
Definition: NodeManager.cc:30
std::map< std::string, Job * > mJobs
List of jobs.
Definition: NodeManager.hh:88
virtual Publisher * publisher() const
Definition: NodeManager.cc:615
std::vector< std::string > mFinishedJobs
List of finished jobs.
Definition: NodeManager.hh:90
TaskInfo * getNextTask()
Definition: NodeManager.cc:258
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:185
std::map< std::string, std::shared_ptr< Worker > > mWorkers
List of Workers.
Definition: NodeManager.hh:92
virtual ~NodeManager()
Definition: NodeManager.cc:12
void print(bool verbose=false) const
Definition: TaskPool.cc:128
Base Socket class.
Definition: Socket.hh:15
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:261
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:29
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:55
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:95
virtual bool sendWhisper(Socket *s, std::string to, std::vector< std::string > &v)
Definition: NodeManager.cc:217
void json(Json::Value &json)
Definition: Job.cc:164
Publisher * mpPublisher
Publisher.
Definition: NodeManager.hh:96
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:73
virtual bool handleTaskPool(void *p)
Definition: NodeManager.cc:503
TaskPool * taskPool()
Get NM's task pool.
Definition: NodeManager.cc:511
Base Salsa Object class.
Definition: Object.hh:15
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:389
std::map< std::string, std::shared_ptr< Feeder > > mFeeders
List of Feeders.
Definition: NodeManager.hh:94
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:82
Base salsa TaskPool class.
Definition: TaskPool.hh:18
virtual void publish(std::string id, std::string name, std::string data)=0
Publish TODO publish what?
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:226
std::shared_ptr< Feeder > feeder(std::string uuid) const
Definition: NodeManager.cc:441
size_t size(EQueueType t=all) const
Definition: Job.cc:196
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:136
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:420
std::shared_ptr< Consumer > consumer(std::string uuid) const
Definition: NodeManager.cc:455
virtual void resultTaskToExternal(Job *, TaskInfo *)
Handle return of task and send it to external client.
Definition: NodeManager.hh:66