salsa  0.3.0
ConfigZyre.cc
1 #include "ConfigZyre.hh"
2 #include <NodeZyre.hh>
3 namespace Salsa {
5 {
9 }
11 {
15 }
16 
17 std::shared_ptr<Salsa::Node> ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> & targetActors)
18 {
22 
23  std::shared_ptr<Salsa::Node> node = nullptr;
24 
25  if (!mConfig["salsa"]["type"] || mConfig["salsa"]["type"].as<std::string>() != "zyre") {
26  SPD_ERROR("Salsa type is not [zyre] !!! ");
27  return nullptr;
28  }
29 
30  if (!mConfig["salsa"]["spec"]) {
31  SPD_ERROR("Salsa spec was not found !!!");
32  return nullptr;
33  }
34 
35  SPD_TRACE("Caching hostname via zsys_hostname()");
36  char * pHostnameCStr_ = zsys_hostname();
37  std::string hostname = "nohostname";
38  if (pHostnameCStr_) {
39  hostname = pHostnameCStr_;
40  free(pHostnameCStr_);
41  }
42  SPD_TRACE("Cached hostname [{}]", hostname);
43 
44  node = std::make_shared<Salsa::Node>("SALSA");
45  int nodeId = 0;
46 
47  for (auto spec : mConfig["salsa"]["spec"]) {
48 
49  if (!spec["nodes"]) {
50  SPD_ERROR("Nodes array is missing for [{}] !!!", spec["name"].as<std::string>());
51  return nullptr;
52  }
53  auto name = spec["name"].as<std::string>();
54  bool found = false;
55  YAML::Node opt;
56 
57  if (mFilter.size() == 0) {
58  found = true;
59  }
60  else {
61  for (auto filter : mFilter) {
62  if (name == filter.first) {
63  opt = filter.second;
64  if (opt["replicas"]) {
65  spec["replicas"] = opt["replicas"].as<int>();
66  }
67  found = true;
68  }
69  }
70  }
71 
72  if (!found) continue;
73  SPD_TRACE("name [{}]", name);
74  int count = 1;
75 
76  if (spec["replicas"]) {
77  count = spec["replicas"].as<int>();
78  }
79  for (int iCount = 0; iCount < count; iCount++) {
80  // Create zyre node
81  std::string zyreName = fmt::format("{}:{}:{}:{}", name, hostname, getpid(), nodeId);
82  std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
83 
84  for (auto nodes : spec["nodes"]) {
85  SPD_TRACE(" name [{}]", nodes["name"].as<std::string>());
86  SPD_TRACE(" zyreName [{}]", zyreName);
87  SPD_TRACE(" type [{}]", nodes["type"].as<std::string>());
88 
89  if (nodes["submit"]["protocol"] && nodes["submit"]["ip"] && nodes["submit"]["port"]) {
90  if (nodes["submit"]["ip"].as<std::string>() == "$all") nodes["submit"]["ip"] = "*";
91  std::string url =
92  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(),
93  nodes["submit"]["ip"].as<std::string>(), nodes["submit"]["port"].as<int>());
94 
95  SPD_INFO("Submit : url [{}]", url);
96  pNodeZyre->addSocket(static_cast<zsock_t *>(zsock_new_router(url.c_str())));
97  }
98  else {
99  Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
100  pNodeInfo->set_name(zyreName);
101 
102  std::map<std::string, std::string> headers;
103  headers.insert(
104  std::pair<std::string, std::string>("X-SALSA-NODE-TYPE", nodes["type"].as<std::string>()));
105 
106  // Create zyre socket for node
107  std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
108  std::make_shared<Salsa::SocketZyre>(zyreName, headers);
109 
110  applyOptions(nodes, opt);
111 
112  if (nodes["discovery"]["type"]) {
113  int port;
114  std::string url, endpoint;
115  if (getenv("SALSA_ENDPOINT")) endpoint = getenv("SALSA_ENDPOINT");
116 
117  std::string discoveryType = nodes["discovery"]["type"].as<std::string>();
118 
119  if (discoveryType == "udp") {
120  port = 10000;
121  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
122  SPD_TRACE("Using discovery [{}] via port [{}]...", discoveryType, port);
123  pSocketZyre->port(port); // Set socket's port
124  }
125  else if (discoveryType == "gossip") {
126  std::string p, i;
127  port = 20000;
128  if (nodes["discovery"]["protocol"]) p = nodes["discovery"]["protocol"].as<std::string>();
129  if (nodes["discovery"]["ip"]) i = nodes["discovery"]["ip"].as<std::string>();
130  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
131 
132  url = fmt::format("{}://{}:{}", p, i, port);
133 
134  SPD_INFO("Using dicovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
135  if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(), "%s", endpoint.c_str());
136 
137  if (nodes["discovery"]["bind"] && nodes["discovery"]["bind"].as<bool>() == true) {
138  if (i == "$all") i = "*";
139  url = fmt::format("{}://{}:{}", p, i, port);
140  zyre_gossip_bind(pSocketZyre->zyre(), "%s", url.c_str());
141  }
142  else {
143  zyre_gossip_connect(pSocketZyre->zyre(), "%s", url.c_str());
144  }
145 
146  if (mConfig["salsa"]["options"]["evasive"]) {
147  SPD_INFO("Setting 'evasive' timeout to [{}] msec ...",
148  mConfig["salsa"]["options"]["evasive"].as<int>());
149  zyre_set_evasive_timeout(pSocketZyre->zyre(),
150  mConfig["salsa"]["options"]["evasive"].as<int>());
151  }
152  if (mConfig["salsa"]["options"]["expired"]) {
153  SPD_INFO("Setting 'expired' timeout to [{}] msec ...",
154  mConfig["salsa"]["options"]["expired"].as<int>());
155  zyre_set_expired_timeout(pSocketZyre->zyre(),
156  mConfig["salsa"]["options"]["expired"].as<int>());
157  }
158  }
159  else {
160  SPD_WARN("No discovery type specified !!!");
161  }
162 
163  const char * zyreInterface = getenv("SALSA_INTERFACE");
164  if (zyreInterface && strcmp(zyreInterface, "")) {
165  SPD_INFO("Using SALSA_INTERFACE [{}]", zyreInterface);
166  zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
167  }
168 
169  pSocketZyre->connect(); // Connect to socket
170  pNodeZyre->addSocket(pSocketZyre); // Add socket to zyre node
171 
172  SPD_INFO("Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
173  "endpoint [{}]",
174  name, zyreName, discoveryType, url, port, endpoint);
175 
176  if (Salsa::Object::getConsoleOutput()->level() < static_cast<int>(spdlog::level::warn)) {
177  zyre_print(pSocketZyre->zyre());
178  }
179  }
180  node->add(pNodeZyre); // Add zyre node to main node
181  targetActors.push_back(pNodeZyre); // Add zyre node to actor index
182  nodeId++;
183  }
184  }
185  }
186  SPD_TRACE("---");
187  nodeId++;
188  }
189 
190  return node;
191 }
192 
193 void ConfigZyre::applyOptions(YAML::detail::iterator_value & src, YAML::Node & opt)
194 {
198  if (opt["type"] && src["discovery"]["type"]) {
199  src["discovery"]["type"] = opt["type"].as<std::string>();
200  }
201  if (opt["protocol"] && src["discovery"]["protocol"]) {
202  src["discovery"]["protocol"] = opt["protocol"].as<std::string>();
203  }
204  if (opt["ip"] && src["discovery"]["ip"]) {
205  src["discovery"]["ip"] = opt["ip"].as<std::string>();
206  }
207  if (opt["port"] && src["discovery"]["port"]) {
208  src["discovery"]["port"] = opt["port"].as<int>();
209  }
210 }
211 
212 } // namespace Salsa
std::map< std::string, YAML::Node > mFilter
Filter list.
Definition: Config.hh:31
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
Definition: ConfigZyre.cc:193
virtual ~ConfigZyre()
Definition: ConfigZyre.cc:10
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
Definition: Object.hh:21
Definition: Actor.cc:2
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> &targetActors)
Definition: ConfigZyre.cc:17
void filter(std::string const &f)
Definition: Config.cc:27
YAML::Node mConfig
YAML Configuration.
Definition: Config.hh:30
Base Config class
Definition: Config.hh:17