salsa  0.4.0
ConfigZyre.cc
1 #include "ConfigZyre.hh"
2 #include "NodeZyre.hh"
3 #include "PollerZmq.hh"
4 namespace Salsa {
6 {
10 }
12 {
16 }
17 
18 std::shared_ptr<Salsa::Node> ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
19 {
23 
24  if (!targetActors) return nullptr;
25 
26  std::shared_ptr<Salsa::Node> node = nullptr;
27 
28  if (!mConfig["salsa"]["type"] || mConfig["salsa"]["type"].as<std::string>() != "zyre") {
29  SPD_ERROR("Salsa type is not [zyre] !!! ");
30  return nullptr;
31  }
32 
33  if (!mConfig["salsa"]["spec"]) {
34  SPD_ERROR("Salsa spec was not found !!!");
35  return nullptr;
36  }
37 
38  SPD_TRACE("Caching hostname via zsys_hostname()");
39  char * pHostnameCStr_ = zsys_hostname();
40  std::string hostname = "nohostname";
41  if (pHostnameCStr_) {
42  hostname = pHostnameCStr_;
43  free(pHostnameCStr_);
44  }
45  SPD_TRACE("Cached hostname [{}]", hostname);
46 
47  node = std::make_shared<Salsa::Node>("SALSA");
48  int nodeId = 0;
49 
50  for (auto spec : mConfig["salsa"]["spec"]) {
51 
52  if (!spec["nodes"]) {
53  SPD_ERROR("Nodes array is missing for [{}] !!!", spec["name"].as<std::string>());
54  return nullptr;
55  }
56  auto name = spec["name"].as<std::string>();
57  bool found = false;
58  YAML::Node opt;
59 
60  if (mFilter.size() == 0) {
61  found = true;
62  }
63  else {
64  for (auto filter : mFilter) {
65  if (name == filter.first) {
66  opt = filter.second;
67  if (opt["replicas"]) {
68  spec["replicas"] = opt["replicas"].as<int>();
69  }
70  found = true;
71  }
72  }
73  }
74 
75  if (!found) continue;
76  SPD_TRACE("name [{}]", name);
77  int count = 1;
78 
79  if (spec["replicas"]) {
80  count = spec["replicas"].as<int>();
81  }
82  for (int iCount = 0; iCount < count; iCount++) {
83  // Create zyre node
84  std::string zyreName = fmt::format("{}:{}:{}:{}", name, hostname, getpid(), nodeId);
85  std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
86 
87  if (spec["jobinfo"]["group"]) {
88  SPD_INFO("Setting jobInfoGroup [{}] from config", spec["jobinfo"]["group"].as<std::string>());
89  pNodeZyre->jobInfoGroupName(spec["jobinfo"]["group"].as<std::string>());
90  }
91 
92  if (spec["jobinfo"]["broker"]["protocol"] && spec["jobinfo"]["broker"]["ip"] &&
93  spec["jobinfo"]["broker"]["port"]) {
94  std::string url = spec["jobinfo"]["broker"]["ip"].as<std::string>();
95  if (url.empty()) url = hostname;
96  url = fmt::format(">{}{}:{}", spec["jobinfo"]["broker"]["protocol"].as<std::string>(), url,
97  spec["jobinfo"]["broker"]["port"].as<std::string>());
98  SPD_INFO("Setting jobInfoBrokerUrl [{}] from config", url);
99  pNodeZyre->jobInfoBrokerUrl(url);
100  }
101  if (spec["jobinfo"]["client"]["protocol"] && spec["jobinfo"]["client"]["ip"] &&
102  spec["jobinfo"]["client"]["port"]) {
103  std::string url = spec["jobinfo"]["client"]["ip"].as<std::string>();
104  if (url.empty()) url = hostname;
105  url = fmt::format(">{}{}:{}", spec["jobinfo"]["client"]["protocol"].as<std::string>(), url,
106  spec["jobinfo"]["client"]["port"].as<std::string>());
107  SPD_INFO("Broker url for client : {}", url);
108  pNodeZyre->jobInfoClientUrl(url);
109  }
110  if (spec["timeout"]["poller"]) {
111  pNodeZyre->timeout(spec["timeout"]["poller"].as<int>());
112  SPD_INFO("Setting poller timeout [{}]", pNodeZyre->timeout());
113  }
114 
115  if (spec["timeout"]["jobfinished"]) {
116  SPD_INFO("Setting jobfinished timeout [{}]", spec["timeout"]["jobfinished"].as<std::string>());
117  setenv("SALSA_FINISHED_JOB_TIMEOUT", spec["timeout"]["jobfinished"].as<std::string>().data(), true);
118  }
119  if (spec["timeout"]["jobcheck"]) {
120  SPD_INFO("Setting jobcheck timeout [{}]", spec["timeout"]["jobcheck"].as<std::string>());
121  setenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec["timeout"]["jobcheck"].as<std::string>().data(), true);
122  }
123 
124  for (auto nodes : spec["nodes"]) {
125  SPD_TRACE(" name [{}]", nodes["name"].as<std::string>());
126  SPD_TRACE(" zyreName [{}]", zyreName);
127  SPD_TRACE(" type [{}]", nodes["type"].as<std::string>());
128 
129  applyOptions(nodes, opt);
130 
131  if (nodes["submit"]["protocol"] && nodes["submit"]["ip"] && nodes["submit"]["port"]) {
132  if (nodes["submit"]["ip"].as<std::string>() == "$all") nodes["submit"]["ip"] = "*";
133  std::string url =
134  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(),
135  nodes["submit"]["ip"].as<std::string>(), nodes["submit"]["port"].as<int>());
136 
137  SPD_INFO("Submit : url [{}]", url);
138  zsock_t * s = zsock_new_router(url.c_str());
139  if (s == nullptr) {
140  SPD_CRIT("Failed to bind submitter on '{}' !!!", url);
141  return nullptr;
142  }
143  pNodeZyre->addSocket(static_cast<zsock_t *>(s));
144  std::string submit_url_client =
145  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(), hostname,
146  nodes["submit"]["port"].as<int>());
147  pNodeZyre->submitClientUrl(submit_url_client);
148  }
149  else {
150  Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
151  pNodeInfo->set_name(zyreName);
152  pNodeInfo->set_hostname(hostname);
153 
154  std::map<std::string, std::string> headers;
155  headers.insert(
156  std::pair<std::string, std::string>("X-SALSA-NODE-TYPE", nodes["type"].as<std::string>()));
157 
158  // Create zyre socket for node
159  std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
160  std::make_shared<Salsa::SocketZyre>(zyreName, headers);
161 
162  applyOptions(nodes, opt);
163 
164  if (nodes["discovery"]["type"]) {
165  int port;
166  std::string url, endpoint;
167  if (getenv("SALSA_ENDPOINT")) endpoint = getenv("SALSA_ENDPOINT");
168 
169  std::string discoveryType = nodes["discovery"]["type"].as<std::string>();
170 
171  if (discoveryType == "udp") {
172  port = 10000;
173  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
174  SPD_TRACE("Using discovery [{}] via port [{}]...", discoveryType, port);
175  pSocketZyre->port(port); // Set socket's port
176  }
177  else if (discoveryType == "gossip") {
178  std::string p, i;
179  port = 20000;
180  if (nodes["discovery"]["protocol"]) p = nodes["discovery"]["protocol"].as<std::string>();
181  if (nodes["discovery"]["ip"]) i = nodes["discovery"]["ip"].as<std::string>();
182  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
183 
184  url = fmt::format("{}://{}:{}", p, i, port);
185 
186  SPD_INFO("Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
187  if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(), "%s", endpoint.c_str());
188 
189  if (nodes["discovery"]["bind"] && nodes["discovery"]["bind"].as<bool>() == true) {
190  if (i == "$all") i = "*";
191  url = fmt::format("{}://{}:{}", p, i, port);
192  zyre_gossip_bind(pSocketZyre->zyre(), "%s", url.c_str());
193  }
194  else {
195  zyre_gossip_connect(pSocketZyre->zyre(), "%s", url.c_str());
196  }
197 
198  if (mConfig["salsa"]["options"]["evasive"]) {
199  SPD_INFO("Setting 'evasive' timeout to [{}] msec ...",
200  mConfig["salsa"]["options"]["evasive"].as<int>());
201  zyre_set_evasive_timeout(pSocketZyre->zyre(),
202  mConfig["salsa"]["options"]["evasive"].as<int>());
203  }
204  if (mConfig["salsa"]["options"]["expired"]) {
205  SPD_INFO("Setting 'expired' timeout to [{}] msec ...",
206  mConfig["salsa"]["options"]["expired"].as<int>());
207  zyre_set_expired_timeout(pSocketZyre->zyre(),
208  mConfig["salsa"]["options"]["expired"].as<int>());
209  }
210  }
211  else {
212  SPD_WARN("No discovery type specified !!!");
213  }
214 
215  const char * zyreInterface = getenv("SALSA_INTERFACE");
216  if (zyreInterface && strcmp(zyreInterface, "")) {
217  SPD_INFO("Using SALSA_INTERFACE [{}]", zyreInterface);
218  zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
219  }
220 
221  pSocketZyre->connect(); // Connect to socket
222  pNodeZyre->addSocket(pSocketZyre); // Add socket to zyre node
223 
224  SPD_INFO("Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
225  "endpoint [{}]",
226  name, zyreName, discoveryType, url, port, endpoint);
227 
228  if (Salsa::Object::getConsoleOutput()->level() < static_cast<int>(spdlog::level::info)) {
229  zyre_print(pSocketZyre->zyre());
230  }
231  }
232  node->add(pNodeZyre); // Add zyre node to main node
233  targetActors->push_back(pNodeZyre); // Add zyre node to actor index
234  nodeId++;
235  }
236  }
237  }
238  SPD_TRACE("---");
239  nodeId++;
240  }
241 
242  return node;
243 }
244 
245 void ConfigZyre::applyOptions(YAML::detail::iterator_value & src, YAML::Node & opt)
246 {
250  if (opt["type"] && src["discovery"]["type"]) {
251  src["discovery"]["type"] = opt["type"].as<std::string>();
252  }
253  if (opt["protocol"] && src["discovery"]["protocol"]) {
254  src["discovery"]["protocol"] = opt["protocol"].as<std::string>();
255  }
256  if (opt["ip"] && src["discovery"]["ip"]) {
257  src["discovery"]["ip"] = opt["ip"].as<std::string>();
258  }
259  if (opt["port"] && src["discovery"]["port"]) {
260  src["discovery"]["port"] = opt["port"].as<int>();
261  }
262  if (opt["submitport"] && src["submit"]["port"]) {
263  src["submit"]["port"] = opt["submitport"].as<int>();
264  }
265 }
266 
267 } // namespace Salsa
std::map< std::string, YAML::Node > mFilter
Filter list.
Definition: Config.hh:31
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
Definition: ConfigZyre.cc:245
virtual ~ConfigZyre()
Definition: ConfigZyre.cc:11
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
Definition: Object.hh:21
void filter(std::string const &f)
Definition: Config.cc:27
YAML::Node mConfig
YAML Configuration.
Definition: Config.hh:30
Base Config class.
Definition: Config.hh:17
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> *targetActors)
Definition: ConfigZyre.cc:18