aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2edi
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq2edi')
-rw-r--r--src/zmq2edi/zmq2edi.cpp135
1 files changed, 73 insertions, 62 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 3888d8a..7f34610 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -27,24 +27,27 @@
#include "Log.h"
#include "zmq.hpp"
-#include <math.h>
#include <getopt.h>
-#include <string.h>
+#include <cmath>
+#include <cstring>
+#include <chrono>
#include <iostream>
#include <iterator>
+#include <thread>
#include <vector>
#include "EDISender.h"
#include "dabOutput/dabOutput.h"
constexpr size_t MAX_ERROR_COUNT = 10;
+constexpr size_t MAX_NUM_RESETS = 180;
constexpr long ZMQ_TIMEOUT_MS = 1000;
static edi::configuration_t edi_conf;
static EDISender edisender;
-void usage(void)
+static void usage()
{
using namespace std;
@@ -70,8 +73,9 @@ void usage(void)
cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
cerr << " -t <ttl> set the packet's TTL." << endl << endl;
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ cerr << "The input socket will be reset if no data is received for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
+ cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -313,85 +317,92 @@ int start(int argc, char **argv)
const char* source_url = argv[optind];
-
- size_t frame_count = 0;
- size_t error_count = 0;
-
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
-
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ size_t num_consecutive_resets = 0;
+ while (num_consecutive_resets < MAX_NUM_RESETS) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ size_t error_count = 0;
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ num_consecutive_resets = 0;
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ const int framesize = dab_msg->buflen[i];
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- offset += framesize;
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
+
+ offset += framesize;
+ }
}
- }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
- frame_count++;
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ }
}
}
- }
- etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+ num_consecutive_resets++;
+
+ zmq_sock.close();
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
+ num_consecutive_resets << " consecutive resets.";
+ }
return 0;
}