diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-05-05 11:24:21 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-05-05 11:24:53 +0200 |
commit | 367b854e7bbf758cb3119162dece4669d00dabb1 (patch) | |
tree | dd93700aec3b34b227b463c783385ddf09b4346a /src/main.cpp | |
parent | a345b9f848750099a2631ee47babcc10035381c4 (diff) | |
download | ODR-EDI2EDI-367b854e7bbf758cb3119162dece4669d00dabb1.tar.gz ODR-EDI2EDI-367b854e7bbf758cb3119162dece4669d00dabb1.tar.bz2 ODR-EDI2EDI-367b854e7bbf758cb3119162dece4669d00dabb1.zip |
Get EDI input working
Diffstat (limited to 'src/main.cpp')
-rw-r--r-- | src/main.cpp | 416 |
1 files changed, 230 insertions, 186 deletions
diff --git a/src/main.cpp b/src/main.cpp index 3005303..b3a0519 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -32,13 +32,12 @@ #include "edioutput/TagItems.h" #include "edioutput/TagPacket.h" #include "edioutput/Transport.h" +#include "edi/ETIDecoder.hpp" using namespace std; constexpr long DEFAULT_BACKOFF = 5000; -static edi::configuration_t edi_conf; - static void usage() { cerr << "Usage:" << endl; @@ -73,218 +72,262 @@ static void usage() /* There is some state inside the parsing of destination arguments, * because several destinations can be given. */ -static std::shared_ptr<edi::udp_destination_t> edi_destination; -static bool source_port_set = false; -static bool source_addr_set = false; -static bool ttl_set = false; -static bool dest_addr_set = false; - -static void add_edi_destination(void) -{ - if (not dest_addr_set) { - throw std::runtime_error("Destination address not specified for destination number " + - std::to_string(edi_conf.destinations.size() + 1)); - } - - edi_conf.destinations.push_back(move(edi_destination)); - edi_destination = std::make_shared<edi::udp_destination_t>(); - - source_port_set = false; - source_addr_set = false; - ttl_set = false; - dest_addr_set = false; -} -static void parse_destination_args(char option) -{ - if (not edi_destination) { - edi_destination = std::make_shared<edi::udp_destination_t>(); - } +class Main : public EdiDecoder::ETIDataCollector { + public: + Main() : edi_decoder(*this) + { + } - switch (option) { - case 's': - if (source_port_set) { - add_edi_destination(); - } - edi_destination->source_port = std::stoi(optarg); - source_port_set = true; - break; - case 'S': - if (source_addr_set) { - add_edi_destination(); - } - edi_destination->source_addr = optarg; - source_addr_set = true; - break; - case 't': - if (ttl_set) { - add_edi_destination(); + // Tell the ETIWriter what EDI protocol we receive in *ptr. + // This is not part of the ETI data, but is used as check + virtual void update_protocol( + const std::string& proto, + uint16_t major, + uint16_t minor) override { } + + // Update the data for the frame characterisation + virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data) override { } + virtual void update_fic(std::vector<uint8_t>&& fic) override { } + virtual void update_err(uint8_t err) override { } + + // In addition to TSTA in ETI, EDI also transports more time + // stamp information. + virtual void update_edi_time(uint32_t utco, uint32_t seconds) override { } + virtual void update_mnsc(uint16_t mnsc) override { } + virtual void update_rfu(uint16_t rfu) override { } + virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc) override { } + + // Tell the ETIWriter that the AFPacket is complete + virtual void assemble(EdiDecoder::ReceivedTagData&& tag_data) override + { + etiLog.level(info) << "Received " << tag_data.all_tags.size() << " tags at " << tag_data.timestamp.to_unix_epoch(); + + for (const auto& tag : tag_data.all_tags) { + etiLog.level(debug) << " TAG " << EdiDecoder::tag_name_to_human_readable(tag.name); } - edi_destination->ttl = std::stoi(optarg); - ttl_set = true; - break; - case 'd': - if (dest_addr_set) { - add_edi_destination(); - } - edi_destination->dest_addr = optarg; - dest_addr_set = true; - break; - default: - throw std::logic_error("parse_destination_args invalid"); - } -} + } -static int start(int argc, char **argv) -{ - edi_conf.enable_pft = true; + int start(int argc, char **argv) + { + edi_conf.enable_pft = true; - if (argc == 0) { - usage(); - return 1; - } + if (argc == 0) { + usage(); + return 1; + } - int delay_ms = 500; - bool drop_late_packets = false; - uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; - std::string startupcheck; - std::string source; - - int ch = 0; - while (ch != -1) { - ch = getopt(argc, argv, "c:C:d:p:s:S:t:Pf:i:Dva:b:w:xh"); - switch (ch) { - case -1: - break; - case 'c': - source = optarg; - break; - case 'C': - startupcheck = optarg; - break; - case 'd': - case 's': - case 'S': - case 't': - parse_destination_args(ch); - break; - case 'p': - edi_conf.dest_port = std::stoi(optarg); - break; - case 'P': - edi_conf.enable_pft = false; - break; - case 'f': - edi_conf.fec = std::stoi(optarg); - break; - case 'i': - { - double interleave_ms = std::stod(optarg); - if (interleave_ms != 0.0) { - if (interleave_ms < 0) { - throw std::runtime_error("EDI output: negative interleave value is invalid."); + int ch = 0; + while (ch != -1) { + ch = getopt(argc, argv, "c:C:d:p:s:S:t:Pf:i:Dva:b:w:xh"); + switch (ch) { + case -1: + break; + case 'c': + source = optarg; + break; + case 'C': + startupcheck = optarg; + break; + case 'd': + case 's': + case 'S': + case 't': + parse_destination_args(ch); + break; + case 'p': + edi_conf.dest_port = std::stoi(optarg); + break; + case 'P': + edi_conf.enable_pft = false; + break; + case 'f': + edi_conf.fec = std::stoi(optarg); + break; + case 'i': + { + double interleave_ms = std::stod(optarg); + if (interleave_ms != 0.0) { + if (interleave_ms < 0) { + throw std::runtime_error("EDI output: negative interleave value is invalid."); + } + + auto latency_rounded = lround(interleave_ms / 24.0); + if (latency_rounded * 24 > 30000) { + throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); + } + + edi_conf.latency_frames = latency_rounded; + } } + break; + case 'D': + edi_conf.dump = true; + break; + case 'v': + edi_conf.verbose = true; + break; + case 'a': + edi_conf.tagpacket_alignment = std::stoi(optarg); + break; + case 'b': + backoff_after_reset_ms = std::stoi(optarg); + break; + case 'w': + delay_ms = std::stoi(optarg); + break; + case 'x': + drop_late_packets = true; + break; + case 'h': + default: + usage(); + return 1; + } + } - auto latency_rounded = lround(interleave_ms / 24.0); - if (latency_rounded * 24 > 30000) { - throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); - } + if (not startupcheck.empty()) { + etiLog.level(info) << "Running startup check '" << startupcheck << "'"; + int wstatus = system(startupcheck.c_str()); - edi_conf.latency_frames = latency_rounded; + if (WIFEXITED(wstatus)) { + if (WEXITSTATUS(wstatus) == 0) { + etiLog.level(info) << "Startup check ok"; + } + else { + etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus); + return 1; } } - break; - case 'D': - edi_conf.dump = true; - break; - case 'v': - edi_conf.verbose = true; - break; - case 'a': - edi_conf.tagpacket_alignment = std::stoi(optarg); - break; - case 'b': - backoff_after_reset_ms = std::stoi(optarg); - break; - case 'w': - delay_ms = std::stoi(optarg); - break; - case 'x': - drop_late_packets = true; - break; - case 'h': - default: - usage(); + else { + etiLog.level(error) << "Startup check failed, child didn't terminate normally"; + return 1; + } + } + + add_edi_destination(); + + if (source.empty()) { + etiLog.level(error) << "source option is missing"; return 1; - } - } + } + + const auto pos_colon = source.find(":"); + if (pos_colon == string::npos or pos_colon == 0) { + etiLog.level(error) << "source does not contain host:port"; + return 1; + } - if (not startupcheck.empty()) { - etiLog.level(info) << "Running startup check '" << startupcheck << "'"; - int wstatus = system(startupcheck.c_str()); + const string connect_to_host = source.substr(0, pos_colon); + const int connect_to_port = stod(source.substr(pos_colon+1)); - if (WIFEXITED(wstatus)) { - if (WEXITSTATUS(wstatus) == 0) { - etiLog.level(info) << "Startup check ok"; + if (edi_conf.dest_port == 0) { + etiLog.level(error) << "No EDI destination port defined"; + return 1; } - else { - etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus); + + if (edi_conf.destinations.empty()) { + etiLog.level(error) << "No EDI destinations set"; return 1; } - } - else { - etiLog.level(error) << "Startup check failed, child didn't terminate normally"; - return 1; - } - } - add_edi_destination(); + edi_decoder.set_verbose(edi_conf.verbose); + etiLog.level(info) << "Setting up EDI2EDI with delay " << delay_ms << " ms. " << + (drop_late_packets ? "Will" : "Will not") << " drop late packets"; - if (source.empty()) { - etiLog.level(error) << "source option is missing"; - return 1; - } - const auto pos_colon = source.find(":"); - if (pos_colon == string::npos or pos_colon == 0) { - etiLog.level(error) << "source does not contain host:port"; - return 1; - } + Socket::TCPSocket sock; + etiLog.level(info) << "Connecting to TCP " << connect_to_host << ":" << connect_to_port; + sock.connect(connect_to_host, connect_to_port); - const string connect_to_host = source.substr(0, pos_colon-1); - const int connect_to_port = stod(source.substr(pos_colon+1)); + ssize_t ret = 0; + do { + const size_t bufsize = 32; + std::vector<uint8_t> buf(bufsize); + ret = sock.recv(buf.data(), buf.size(), 0); + if (ret > 0) { + buf.resize(ret); + std::vector<uint8_t> frame; + edi_decoder.push_bytes(buf); + } + } while (ret > 0); - if (edi_conf.dest_port == 0) { - etiLog.level(error) << "No EDI destination port defined"; - return 1; - } + return 0; + } - if (edi_conf.destinations.empty()) { - etiLog.level(error) << "No EDI destinations set"; - return 1; - } + private: - etiLog.level(info) << "Setting up EDI2EDI with delay " << delay_ms << " ms. " << - (drop_late_packets ? "Will" : "Will not") << " drop late packets"; + void add_edi_destination(void) + { + if (not dest_addr_set) { + throw std::runtime_error("Destination address not specified for destination number " + + std::to_string(edi_conf.destinations.size() + 1)); + } + edi_conf.destinations.push_back(move(edi_destination)); + edi_destination = std::make_shared<edi::udp_destination_t>(); - Socket::TCPSocket sock; - etiLog.level(info) << "Connecting to TCP " << connect_to_host << ":" << connect_to_port; - sock.connect(connect_to_host, connect_to_port); + source_port_set = false; + source_addr_set = false; + ttl_set = false; + dest_addr_set = false; + } - ssize_t ret = 0; - do { - const size_t bufsize = 32; - std::vector<uint8_t> buf(bufsize); - ret = sock.recv(buf.data(), buf.size(), 0); - if (ret > 0) { - buf.resize(ret); - std::vector<uint8_t> frame; - decoder.push_bytes(buf); + void parse_destination_args(char option) + { + if (not edi_destination) { + edi_destination = std::make_shared<edi::udp_destination_t>(); + } + + switch (option) { + case 's': + if (source_port_set) { + add_edi_destination(); + } + edi_destination->source_port = std::stoi(optarg); + source_port_set = true; + break; + case 'S': + if (source_addr_set) { + add_edi_destination(); + } + edi_destination->source_addr = optarg; + source_addr_set = true; + break; + case 't': + if (ttl_set) { + add_edi_destination(); + } + edi_destination->ttl = std::stoi(optarg); + ttl_set = true; + break; + case 'd': + if (dest_addr_set) { + add_edi_destination(); + } + edi_destination->dest_addr = optarg; + dest_addr_set = true; + break; + default: + throw std::logic_error("parse_destination_args invalid"); + } } - } while (ret > 0); -} + + std::shared_ptr<edi::udp_destination_t> edi_destination; + bool source_port_set = false; + bool source_addr_set = false; + bool ttl_set = false; + bool dest_addr_set = false; + edi::configuration_t edi_conf; + EdiDecoder::ETIDecoder edi_decoder; + int delay_ms = 500; + bool drop_late_packets = false; + uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; + std::string startupcheck; + std::string source; +}; + int main(int argc, char **argv) { @@ -299,7 +342,8 @@ int main(int argc, char **argv) int ret = 1; try { - ret = start(argc, argv); + Main m; + ret = m.start(argc, argv); // To make sure things get printed to stderr std::this_thread::sleep_for(std::chrono::milliseconds(300)); |