salsa  0.4.0
Job.cc
1 #include "Job.hh"
2 
3 namespace Salsa {
4 Job::Job(std::string uuid, std::string type) : Object(), mUUID(uuid), mType(type)
5 {
9 
10  // mTimeStarted = std::chrono::system_clock::now();
11  mTimeStarted =
12  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
13 }
14 
16 {
20  for (int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
21  for (auto iTask : mTasks[iType]) {
22  delete iTask.second;
23  iTask.second = nullptr;
24  }
25  mTasks[iType].clear();
26  }
27 }
28 
29 bool Job::addTask(uint32_t id, TaskInfo * pTaskInfo, EQueueType type)
30 {
34  if (!pTaskInfo) {
35  return false;
36  }
37 
38  if (type >= all) {
39  SPD_CRIT("EQueueType is out of range [{}]", type);
40  return false;
41  }
42 
43  if (mUid == 99 && mGid == 99) {
44  mUid = pTaskInfo->clientid();
45  mGid = pTaskInfo->groupid();
46  }
47 
48  mTasks[type].insert(std::make_pair(id, pTaskInfo));
49  return true;
50 }
51 
52 bool Job::moveTask(uint32_t id, EQueueType from, EQueueType to)
53 {
57 
58  return moveTask(id, nullptr, from, to);
59 }
60 
61 bool Job::moveTask(JobID_t id, TaskInfo * pTaskInfo, EQueueType from, EQueueType to)
62 {
66  auto iFound = mTasks[from].find(id);
67  if (iFound != mTasks[from].end()) {
68  if (pTaskInfo == nullptr) {
69  pTaskInfo = iFound->second;
70  }
71  else {
72  delete iFound->second;
73  }
74  // TODO This is asking for trouble...
75  // Possible fix would be... std::shared_ptr;
76  // Also, why do we even need to supply pTaskInfo anyways?
77 
78  mTasks[from].erase(iFound);
79  addTask(id, pTaskInfo, to);
80  mChanged = true;
81  return true;
82  }
83  else {
84  SPD_WARN("Job with id [{}] was not found in queue [{}] !!!", id, from);
85  return false;
86  }
87 }
88 
89 bool Job::removeTask(uint32_t id, EQueueType from)
90 {
94 
95  // TODO This could cause problems at some point...
96  auto found = mTasks[from].find(id);
97  if (found != mTasks[from].end()) {
98  mTasks[from].erase(found);
99  return true;
100  }
101 
102  return false;
103 }
104 
105 TaskInfo * Job::nextTask()
106 {
110 
111  auto iAvailTask = mTasks[EQueueType::pending].begin();
112  if (iAvailTask == mTasks[EQueueType::pending].end()) {
113  return nullptr;
114  }
115 
116  TaskInfo * pNewTask = iAvailTask->second;
117  moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
118  return pNewTask;
119 }
120 
121 void Job::tasks(std::vector<TaskInfo *> & targetVec, EQueueType type, bool shouldClear)
122 {
126 
127  for (auto task : mTasks[type]) {
128  targetVec.push_back(task.second);
129  }
130 
131  if (shouldClear) {
132  mTasks[type].clear();
133  }
134 }
135 
137 {
141 
142  return (mTasks[type].find(id) != mTasks[type].end());
143 
144  // auto found = mTasks[type].find(id);
145  // if (found != mTasks[type].end()) {
146  // return true;
147  //}
148  // else {
149  // return false;
150  //}
151 }
152 
153 void Job::print() const
154 {
158  SPD_DEBUG("{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]", mUUID, mTasks[EQueueType::pending].size(),
159  mTasks[EQueueType::assigned].size(), mTasks[EQueueType::running].size(), mTasks[EQueueType::done].size(),
160  mTasks[EQueueType::failed].size(), mTimeStarted, mTimeFinished);
161  SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
162 }
163 
164 void Job::json(Json::Value & json)
165 {
169 
170  Json::Value d;
171  d["name"] = mUUID;
172  d["uid"] = mUid;
173  d["gid"] = mGid;
174  Json::UInt64 ts = static_cast<Json::UInt64>(mTimeStarted);
175  Json::UInt64 tf = static_cast<Json::UInt64>(mTimeFinished);
176  d["time"]["started"] = ts;
177  if (tf) d["time"]["finished"] = tf;
178  d["P"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::pending].size());
179  d["A"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::assigned].size());
180  d["R"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::running].size());
181  d["D"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::done].size());
182  d["F"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::failed].size());
183 
184  d["rc"]["done"] = Json::arrayValue;
185  for (auto ti : mTasks[EQueueType::done]) {
186  d["rc"]["done"].append(ti.second->taskid());
187  }
188  d["rc"]["failed"] = Json::arrayValue;
189  for (auto ti : mTasks[EQueueType::failed]) {
190  d["rc"]["failed"].append(ti.second->taskid());
191  }
192 
193  json.append(d);
194 }
195 
196 size_t Job::size(EQueueType type) const
197 {
202  if (type >= EQueueType::all) {
203  size_t sum = mTasks[EQueueType::pending].size();
204  sum += mTasks[EQueueType::assigned].size();
205  sum += mTasks[EQueueType::running].size();
206  sum += mTasks[EQueueType::done].size();
207  sum += mTasks[EQueueType::failed].size();
208  return sum;
209  }
210  else {
211 
212  return mTasks[type].size();
213  }
214 }
215 
216 size_t Job::sizeNotFinished() const
217 {
221  size_t sum = mTasks[EQueueType::pending].size();
222  sum += mTasks[EQueueType::assigned].size();
223  sum += mTasks[EQueueType::running].size();
224  return sum;
225 }
226 
227 void Job::consumer(std::string uuid)
228 {
233 }
234 
235 std::string Job::consumer() const
236 {
240  // TODO Also potentialy DANGEROUS
241  return std::move(mConsumerUUID);
242 }
243 
244 void Job::feeder(std::string uuid)
245 {
249  mFeederUUID = uuid;
250 }
251 
252 std::string Job::feeder() const
253 {
257  // TODO Potentialy DANGEROUS!
258  return std::move(mFeederUUID);
259 }
260 
261 bool Job::haveMoreTasks() const
262 {
266  return !mTasks[Job::pending].empty();
267 }
268 
270 {
274  if (sizeNotFinished() > 0) return false;
275 
276  mTimeFinished =
277  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
278 
279  return true;
280 }
281 
282 } // namespace Salsa
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:89
uint32_t JobID_t
Job ID type alias.
Definition: Job.hh:65
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:52
size_t sizeNotFinished() const
Definition: Job.cc:216
Job(std::string uuid="", std::string type="NONE")
Definition: Job.cc:4
std::string mFeederUUID
Feeder UUID.
Definition: Job.hh:88
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:269
EQueueType
Queue types.
Definition: Job.hh:19
std::string uuid() const
returns UUID
Definition: Job.hh:28
std::string mConsumerUUID
Source (consumer) UUID.
Definition: Job.hh:87
std::string mUUID
Job UUID.
Definition: Job.hh:84
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
Definition: Job.hh:83
virtual ~Job()
Definition: Job.cc:15
bool mChanged
Flag if job was changed.
Definition: Job.hh:96
std::string feeder() const
Definition: Job.cc:252
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:261
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:29
void json(Json::Value &json)
Definition: Job.cc:164
Base Salsa Object class.
Definition: Object.hh:15
std::string consumer() const
Definition: Job.cc:235
void print() const
Definition: Job.cc:153
uint32_t mUid
Job user id (nobody : 99)
Definition: Job.hh:85
uint32_t mGid
Job group id (nogroup : 99)
Definition: Job.hh:86
uint64_t mTimeFinished
Time finished.
Definition: Job.hh:91
uint64_t mTimeStarted
Time started.
Definition: Job.hh:90
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:121
TaskInfo * nextTask()
Definition: Job.cc:105
size_t size(EQueueType t=all) const
Definition: Job.cc:196
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:136