aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2022-01-26 09:21:37 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2022-01-26 09:21:37 +0100
commit9d0266a00651db5a1d25b5a3e7d9c724725523c5 (patch)
treeb0444c5b4bb406800f2e7a20d347e66a850c858f
parent040a39445239dc33d4f532af68044c26e929be4a (diff)
downloadODR-EDI2EDI-9d0266a00651db5a1d25b5a3e7d9c724725523c5.tar.gz
ODR-EDI2EDI-9d0266a00651db5a1d25b5a3e7d9c724725523c5.tar.bz2
ODR-EDI2EDI-9d0266a00651db5a1d25b5a3e7d9c724725523c5.zip
Remove drop, improve behaviour with late sources
-rw-r--r--Makefile.am1
-rwxr-xr-xedi2edi_remote.py20
-rw-r--r--lib/Socket.cpp9
-rw-r--r--lib/Socket.h1
-rw-r--r--src/EDISender.cpp91
-rw-r--r--src/EDISender.h18
-rw-r--r--src/main.cpp144
-rw-r--r--src/main.h78
-rw-r--r--src/receiver.cpp145
-rw-r--r--src/receiver.h130
10 files changed, 370 insertions, 267 deletions
diff --git a/Makefile.am b/Makefile.am
index ac998f2..454a046 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -14,6 +14,7 @@ odr_edi2edi_CFLAGS =-Wall $(INCLUDE) $(GITVERSION_FLAGS)
odr_edi2edi_CXXFLAGS =-Wall -std=c++11 $(INCLUDE) $(GITVERSION_FLAGS)
odr_edi2edi_LDADD = -lpthread
odr_edi2edi_SOURCES = src/main.h src/main.cpp \
+ src/receiver.h src/receiver.cpp \
src/EDISender.h src/EDISender.cpp \
lib/crc.h lib/crc.c \
lib/Globals.cpp \
diff --git a/edi2edi_remote.py b/edi2edi_remote.py
index 0d61216..12ce5e0 100755
--- a/edi2edi_remote.py
+++ b/edi2edi_remote.py
@@ -10,9 +10,6 @@ parser = argparse.ArgumentParser(description="Remote Control for ODR-EDI2EDI")
parser.add_argument('-s', '--socket', type=str, help='UNIX DGRAM socket path to send to', required=True)
parser.add_argument('--get', action="store_true", help='Get and display current settings', required=False)
parser.add_argument('-w', '--delay', type=int, help='Set the delay to the given value in milliseconds', required=False)
-parser.add_argument('--drop', action="store_true", help='Enable dropping of late packets', required=False)
-parser.add_argument('--no-drop', action="store_true", help='Disable dropping of late packets', required=False)
-parser.add_argument('-x', '--drop-delay', type=int, help='Set the drop-delay to the given value in milliseconds', required=False)
parser.add_argument('-b', '--backoff', type=int, help='Set the backoff to the given value in milliseconds', required=False)
parser.add_argument('--list-inputs', action="store_true", help='List inputs and their settings', required=False)
@@ -21,11 +18,8 @@ parser.add_argument('--disable-input', type=str, help='Disable input specified b
cli_args = parser.parse_args()
-if cli_args.drop and cli_args.no_drop:
- print("You want to drop or not? Make up your mind!", file=sys.stderr)
- sys.exit(1)
-
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+s.settimeout(2)
MY_SOCK_PATH = "/tmp/edi2edi-remote"
if os.path.exists(MY_SOCK_PATH):
os.remove(MY_SOCK_PATH)
@@ -50,22 +44,10 @@ if cli_args.get:
print("Current settings:", file=sys.stderr)
send_command("get settings")
-if cli_args.drop:
- print("Enable late packet drop", file=sys.stderr)
- send_command("set drop-late 1")
-
-if cli_args.no_drop:
- print("Disable late packet drop", file=sys.stderr)
- send_command("set drop-late 0")
-
if cli_args.delay:
print(f"Setting delay to {cli_args.delay}", file=sys.stderr)
send_command(f"set delay {cli_args.delay}")
-if cli_args.drop_delay:
- print(f"Setting drop-delay to {cli_args.drop_delay}", file=sys.stderr)
- send_command(f"set drop-delay {cli_args.drop_delay}")
-
if cli_args.backoff:
print(f"Setting backoff to {cli_args.backoff}", file=sys.stderr)
send_command(f"set backoff {cli_args.backoff}")
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index d12c970..1ff6418 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -259,6 +259,15 @@ void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
}
}
+void UDPSocket::send(const std::string& data, InetAddress destination)
+{
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
+ destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
{
ip_mreqn group;
diff --git a/lib/Socket.h b/lib/Socket.h
index 3b27006..f5143a0 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -115,6 +115,7 @@ class UDPSocket
void close(void);
void send(UDPPacket& packet);
void send(const std::vector<uint8_t>& data, InetAddress destination);
+ void send(const std::string& data, InetAddress destination);
UDPPacket receive(size_t max_size);
void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
diff --git a/src/EDISender.cpp b/src/EDISender.cpp
index b57ce65..3ed1666 100644
--- a/src/EDISender.cpp
+++ b/src/EDISender.cpp
@@ -27,7 +27,6 @@
#include "EDISender.h"
#include "Log.h"
-#include "ThreadsafeQueue.h"
#include <cmath>
#include <cstring>
#include <iomanip>
@@ -36,6 +35,9 @@
#include <algorithm>
#include <limits>
+// This is a remnant of the -x option
+static const bool DROP_LATE = true;
+
using namespace std;
EDISender::~EDISender()
@@ -63,31 +65,61 @@ void EDISender::update_settings(const EDISenderSettings& settings)
_settings = settings;
}
-void EDISender::push_tagpacket(tagpacket_t&& tp)
+void EDISender::push_tagpacket(tagpacket_t&& tp, Receiver* r)
{
+ stringstream ss;
+ ss << "EDISender ";
+ const auto t_now = chrono::system_clock::now();
+ const auto time_t_now = chrono::system_clock::to_time_t(t_now);
+ char timestr[100];
+ if (std::strftime(timestr, sizeof(timestr), "%Y-%m-%dZ%H:%M:%S", std::gmtime(&time_t_now))) {
+ ss << timestr;
+ }
+
+ using namespace chrono;
+ const auto t_frame = tp.timestamp.to_system_clock();
+ const auto t_release = t_frame + milliseconds(_settings.delay_ms);
+ const auto margin = t_release - t_now;
+ const auto margin_ms = chrono::duration_cast<chrono::milliseconds>(margin).count();
+ const bool late = t_release < t_now;
+
std::unique_lock<std::mutex> lock(_pending_tagpackets_mutex);
+ ss << " P " << _pending_tagpackets.size() << " dlfc " <<
+ tp.dlfc << " margin " << margin_ms << " from " << tp.hostnames;
+
bool inserted = false;
- for (auto it = _pending_tagpackets.begin(); it != _pending_tagpackets.end(); ++it) {
- if (tp.timestamp < it->timestamp) {
- _pending_tagpackets.insert(it, move(tp));
- inserted = true;
- break;
- }
- else if (tp.timestamp == it->timestamp) {
- if (tp.dlfc != it->dlfc) {
- etiLog.level(warn) << "Received packet " << tp.dlfc << " from "
- << tp.hostname << ":" << tp.port <<
- " with same timestamp but different DLFC than previous packet from "
- << it->hostname << ":" << it->port << " with " << it->dlfc;
+ if (not late) {
+ for (auto it = _pending_tagpackets.begin(); it != _pending_tagpackets.end(); ++it) {
+ if (tp.timestamp < it->timestamp) {
+ _pending_tagpackets.insert(it, move(tp));
+ inserted = true;
+ ss << " new";
+ break;
}
-
+ else if (tp.timestamp == it->timestamp) {
+ if (tp.dlfc != it->dlfc) {
+ ss << " dlfc err";
+ etiLog.level(warn) << "Received packet " << tp.dlfc << " from "
+ << tp.hostnames <<
+ " with same timestamp but different DLFC than previous packet from "
+ << it->hostnames << " with " << it->dlfc;
+ }
+ else {
+ ss << " dup";
+ it->hostnames += ";" + tp.hostnames;
+ }
#warning "TODO statistics"
- inserted = true;
- break;
+ inserted = true;
+ break;
+ }
}
}
+ else {
+ ss << " late";
+ r->num_late++;
+ }
if (not inserted) {
_pending_tagpackets.push_back(move(tp));
@@ -95,7 +127,16 @@ void EDISender::push_tagpacket(tagpacket_t&& tp)
if (_pending_tagpackets.size() > MAX_PENDING_TAGPACKETS) {
_pending_tagpackets.pop_front();
+ num_queue_dropped++;
+ ss << " Drop ";
}
+
+ lock.unlock();
+ ss << "\n";
+ Socket::UDPSocket udp;
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination("127.0.0.1", 8008);
+ udp.send(ss.str(), addr);
}
void EDISender::print_configuration()
@@ -117,27 +158,25 @@ void EDISender::send_tagpacket(tagpacket_t& tp)
const auto t_frame = tp.timestamp.to_system_clock();
const auto t_release = t_frame + milliseconds(_settings.delay_ms);
- const auto t_latest_release = t_frame + milliseconds(_settings.drop_delay_ms);
const auto t_now = system_clock::now();
- const bool slightly_late = t_release < t_now;
- const bool late = t_latest_release < t_now;
+ const bool late = t_release < t_now;
buffering_stat_t stat;
- stat.late = slightly_late;
+ stat.late = late;
- if (not slightly_late) {
+ if (not late) {
const auto wait_time = t_release - t_now;
std::this_thread::sleep_for(wait_time);
}
const auto t_now_steady = steady_clock::now();
stat.inhibited = t_now_steady < _output_inhibit_until;
- stat.dropped = late and _settings.drop_late;
+ stat.dropped = late and DROP_LATE;
stat.buffering_time_us = duration_cast<microseconds>(t_now_steady - tp.received_at).count();
_buffering_stats.push_back(std::move(stat));
- if (late and _settings.drop_late) {
+ if (late and DROP_LATE) {
return;
}
@@ -170,18 +209,16 @@ void EDISender::process()
{
while (_running.load()) {
tagpacket_t tagpacket;
- bool valid = false;
{
std::unique_lock<std::mutex> lock(_pending_tagpackets_mutex);
if (_pending_tagpackets.size() > 0) {
tagpacket = _pending_tagpackets.front();
- valid = true;
_pending_tagpackets.pop_front();
}
}
- if (not valid) {
+ if (tagpacket.tagpacket.empty()) {
this_thread::sleep_for(chrono::milliseconds(1));
continue;
}
diff --git a/src/EDISender.h b/src/EDISender.h
index c826f5d..b3ab905 100644
--- a/src/EDISender.h
+++ b/src/EDISender.h
@@ -34,6 +34,7 @@
#include <mutex>
#include <list>
#include <vector>
+#include "receiver.h"
#include "edioutput/TagItems.h"
#include "edioutput/TagPacket.h"
#include "edioutput/Transport.h"
@@ -41,22 +42,8 @@
static constexpr size_t MAX_PENDING_TAGPACKETS = 1000;
-struct tagpacket_t {
- // source information
- std::string hostname;
- int port;
-
- uint16_t dlfc;
- std::vector<uint8_t> tagpacket;
- EdiDecoder::frame_timestamp_t timestamp;
- std::chrono::steady_clock::time_point received_at;
- EdiDecoder::seq_info_t seq;
-};
-
struct EDISenderSettings {
int delay_ms = -500;
- bool drop_late = false;
- int drop_delay_ms = 0;
};
class EDISender {
@@ -67,7 +54,7 @@ class EDISender {
~EDISender();
void start(const edi::configuration_t& conf, const EDISenderSettings& settings);
void update_settings(const EDISenderSettings& settings);
- void push_tagpacket(tagpacket_t&& tagpacket);
+ void push_tagpacket(tagpacket_t&& tagpacket, Receiver* r);
void print_configuration(void);
private:
@@ -83,6 +70,7 @@ class EDISender {
// ordered by transmit timestamps
std::list<tagpacket_t> _pending_tagpackets;
+ size_t num_queue_dropped = 0;
mutable std::mutex _pending_tagpackets_mutex;
std::shared_ptr<edi::Sender> _edi_sender;
diff --git a/src/main.cpp b/src/main.cpp
index b884eed..2d0d031 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -45,8 +45,6 @@ using namespace std;
volatile sig_atomic_t running = 1;
-static constexpr auto RECONNECT_DELAY = chrono::milliseconds(24);
-
void signal_handler(int signum)
{
if (signum == SIGTERM) {
@@ -70,7 +68,6 @@ static void usage()
cerr << " -F <host:port> Add fallback input to given host and port using TCP.\n";
cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time.\n";
cerr << " Negative delay values are also allowed.\n";
- cerr << " -x <drop_delay> Drop frames where for which are too late, defined by the drop delay.\n";
cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0.\n";
cerr << " This is useful for checking that NTP is properly synchronised\n";
cerr << " -P Disable PFT and send AFPackets.\n";
@@ -98,98 +95,6 @@ static const struct option longopts[] = {
{0, 0, 0, 0}
};
-Receiver::Receiver(source_t& source, EDISender& edi_sender, bool verbose) :
- source(source),
- edi_sender(edi_sender),
- edi_decoder(*this)
-{
- edi_decoder.set_verbose(verbose);
-
- if (source.active) {
- etiLog.level(info) << "Connecting to TCP " << source.hostname << ":" << source.port;
- sock.connect(source.hostname, source.port, /*nonblock*/ true);
- }
-}
-
-void Receiver::update_fc_data(const EdiDecoder::eti_fc_data& fc_data) {
- dlfc = fc_data.dlfc;
-}
-
-void Receiver::assemble(EdiDecoder::ReceivedTagPacket&& tag_data) {
- tagpacket_t tp;
- tp.hostname = source.hostname;
- tp.port = source.port;
- tp.seq = tag_data.seq;
- tp.dlfc = dlfc;
- tp.tagpacket = move(tag_data.tagpacket);
- tp.received_at = chrono::steady_clock::now();
- tp.timestamp = move(tag_data.timestamp);
- margin = tp.timestamp.to_system_clock() - chrono::system_clock::now();
- edi_sender.push_tagpacket(move(tp));
-}
-
-void Receiver::tick()
-{
- if (source.active) {
- if (not sock.valid()) {
- if (reconnect_at < chrono::steady_clock::now()) {
- sock.connect(source.hostname, source.port, /*nonblock*/ true);
- reconnect_at += RECONNECT_DELAY;
- }
- }
- }
- else {
- if (sock.valid()) {
- etiLog.level(info) << "Disconnecting from TCP " << source.hostname << ":" << source.port;
- sock.close();
- }
- }
-}
-int Receiver::get_margin_ms() const {
- if (source.active) {
- using namespace chrono;
- return duration_cast<milliseconds>(margin).count();
- }
- else {
- return 0;
- }
-}
-
-void Receiver::receive()
-{
- const size_t bufsize = 32;
- vector<uint8_t> buf(bufsize);
- bool success = false;
- ssize_t ret = ::recv(get_sockfd(), buf.data(), buf.size(), 0);
- if (ret == -1) {
- if (errno == EINTR) {
- success = false;
- }
- else if (errno == ECONNREFUSED) {
- // Behave as if disconnected
- }
- else {
- string errstr(strerror(errno));
- throw runtime_error("TCP receive after poll() error: " + errstr);
- }
- }
- else if (ret > 0) {
- buf.resize(ret);
- edi_decoder.push_bytes(buf);
- success = true;
- }
- // ret == 0 means disconnected
-
- if (not success) {
- sock.close();
- reconnect_at = chrono::steady_clock::now() + RECONNECT_DELAY;
- }
- else {
- most_recent_rx_systime = chrono::system_clock::now();
- most_recent_rx_time = chrono::steady_clock::now();
- }
-}
-
int Main::start(int argc, char **argv)
{
@@ -233,8 +138,10 @@ int Main::start(int argc, char **argv)
}
const bool enabled = ch == 'c';
- const bool active = false; // Initialised once we know mode
- sources.push_back({optarg_s.substr(0, pos_colon), stoi(optarg_s.substr(pos_colon+1)), enabled, active});
+ sources.push_back({
+ optarg_s.substr(0, pos_colon),
+ stoi(optarg_s.substr(pos_colon+1)),
+ enabled});
}
break;
case 'C':
@@ -283,10 +190,6 @@ int Main::start(int argc, char **argv)
case 'w':
edisendersettings.delay_ms = stoi(optarg);
break;
- case 'x':
- edisendersettings.drop_late = true;
- edisendersettings.drop_delay_ms = stoi(optarg);
- break;
case 'h':
default:
usage();
@@ -330,9 +233,7 @@ int Main::start(int argc, char **argv)
return 1;
}
- etiLog.level(info) << "Setting up EDI2EDI with delay " << edisendersettings.delay_ms << " ms. " <<
- (edisendersettings.drop_late ? "Will" : "Will not") <<
- " drop late packets (" << edisendersettings.drop_delay_ms << " ms)";
+ etiLog.level(info) << "Setting up EDI2EDI with delay " << edisendersettings.delay_ms << " ms. ";
if (not rc_socket_name.empty()) {
try {
@@ -347,7 +248,8 @@ int Main::start(int argc, char **argv)
receivers.reserve(16); // Ensure the receivers don't get moved around, as their edi_decoder needs their address
for (auto& source : sources) {
- receivers.emplace_back(source, edisender, edi_conf.verbose);
+ auto callback = [&](tagpacket_t&& tp, Receiver* r) { edisender.push_tagpacket(move(tp), r); };
+ receivers.emplace_back(source, callback, edi_conf.verbose);
}
@@ -661,8 +563,6 @@ string Main::handle_rc_command(const string& cmd)
using namespace chrono;
stringstream ss;
ss << "{ \"delay\": " << edisendersettings.delay_ms <<
- ", \"drop-late\": " << (edisendersettings.drop_late ? "true" : "false") <<
- ", \"drop-delay\": " << edisendersettings.drop_delay_ms <<
", \"backoff\": " << duration_cast<milliseconds>(backoff).count() <<
"}";
r = ss.str();
@@ -679,9 +579,15 @@ string Main::handle_rc_command(const string& cmd)
" \"hostname\": \"" << it->source.hostname << "\"," <<
" \"port\": " << it->source.port << "," <<
" \"last_packet_received_at\": " << rx_packet_time << "," <<
- " \"margin\": " << it->get_margin_ms() << "," <<
+ " \"connected\": " << (it->source.connected ? "true" : "false") << "," <<
" \"active\": " << (it->source.active ? "true" : "false") << "," <<
- " \"enabled\": " << (it->source.enabled ? "true" : "false");
+ " \"enabled\": " << (it->source.enabled ? "true" : "false") << ",";
+
+ ss << " \"stats\": {" <<
+ " \"margin\": " << it->get_margin_ms() << "," <<
+ " \"num_late\": " << it->num_late << "," <<
+ " \"num_connects\": " << it->source.num_connects <<
+ " }";
++it;
if (it == receivers.end()) {
@@ -737,26 +643,6 @@ string Main::handle_rc_command(const string& cmd)
edisender.update_settings(edisendersettings);
etiLog.level(info) << "RC setting delay to " << value;
}
- else if (cmd.rfind("set drop-delay ", 0) == 0) {
- auto value = stoi(cmd.substr(15, cmd.size()));
- if (value < -100000 or value > 100000) {
- throw invalid_argument("delay value out of bounds +/- 100s");
- }
- edisendersettings.drop_delay_ms = value;
- edisender.update_settings(edisendersettings);
- etiLog.level(info) << "RC setting drop-delay to " << value;
- }
- else if (cmd.rfind("set drop-late ", 0) == 0) {
- auto value = stoi(cmd.substr(14, cmd.size()));
- if (value == 0 or value == 1) {
- edisendersettings.drop_late = value;
- edisender.update_settings(edisendersettings);
- etiLog.level(info) << "RC setting drop-late to " << value;
- }
- else {
- throw invalid_argument("value must be 0 or 1");
- }
- }
else if (cmd.rfind("set backoff ", 0) == 0) {
auto value = stoi(cmd.substr(12, cmd.size()));
if (value < 0 or value > 100000) {
diff --git a/src/main.h b/src/main.h
index 43c5cbb..d98bd82 100644
--- a/src/main.h
+++ b/src/main.h
@@ -29,93 +29,17 @@
#include <vector>
#include <cmath>
#include <cstring>
-#include "edi/ETIDecoder.hpp"
+#include "receiver.h"
#include "EDISender.h"
#include "edioutput/TagItems.h"
#include "edioutput/TagPacket.h"
#include "edioutput/Transport.h"
-#include "edi/ETIDecoder.hpp"
constexpr long DEFAULT_BACKOFF = 5000;
constexpr long DEFAULT_SWITCH_DELAY = 2000;
void signal_handler(int signum);
-struct source_t {
- std::string hostname;
- int port;
-
- // User-controlled setting
- bool enabled;
-
- // Mode merging: active will be set for all enabled inputs.
- // Mode switching: only one input will be active
- bool active;
-};
-
-class Receiver : public EdiDecoder::ETIDataCollector {
- public:
- Receiver(source_t& source, EDISender& edisender, bool verbose);
- Receiver(const Receiver&) = delete;
- Receiver operator=(const Receiver&) = delete;
- Receiver(Receiver&&) = default;
- Receiver& operator=(Receiver&&) = delete;
-
- // Tell the ETIWriter what EDI protocol we receive in *ptr.
- // This is not part of the ETI data, but is used as check
- virtual void update_protocol(
- const std::string& proto,
- uint16_t major,
- uint16_t minor) override { }
-
- // Update the data for the frame characterisation
- virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data) override;
-
- // Ignore most events because we are interested in retransmitting EDI, not
- // decoding it
- virtual void update_fic(std::vector<uint8_t>&& fic) override { }
- virtual void update_err(uint8_t err) override { }
- virtual void update_edi_time(uint32_t utco, uint32_t seconds) override { }
- virtual void update_mnsc(uint16_t mnsc) override { }
- virtual void update_rfu(uint16_t rfu) override { }
- virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc) override { }
-
- // Tell the ETIWriter that the AFPacket is complete
- virtual void assemble(EdiDecoder::ReceivedTagPacket&& tag_data) override;
-
- // Must return -1 if the socket is not poll()able
- int get_sockfd() const { return sock.get_sockfd(); }
-
- void receive();
- void tick();
- int get_margin_ms() const;
-
- std::chrono::system_clock::time_point get_systime_last_packet() const
- {
- return most_recent_rx_systime;
- }
-
- std::chrono::steady_clock::time_point get_time_last_packet() const
- {
- return most_recent_rx_time;
- }
-
- source_t& source;
-
- private:
- EDISender& edi_sender;
- EdiDecoder::ETIDecoder edi_decoder;
- uint16_t dlfc = 0;
-
- std::chrono::steady_clock::time_point reconnect_at = std::chrono::steady_clock::now();
- std::chrono::steady_clock::time_point most_recent_rx_time = std::chrono::steady_clock::now();
- std::chrono::system_clock::time_point most_recent_rx_systime = std::chrono::system_clock::now();
-
- std::chrono::system_clock::duration margin = std::chrono::system_clock::duration::zero();
-
- Socket::TCPSocket sock;
-};
-
class Main {
public:
int start(int argc, char **argv);
diff --git a/src/receiver.cpp b/src/receiver.cpp
new file mode 100644
index 0000000..d822fa3
--- /dev/null
+++ b/src/receiver.cpp
@@ -0,0 +1,145 @@
+/*
+ Copyright (C) 2022
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of the ODR-mmbTools.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include <algorithm>
+#include <chrono>
+#include <iostream>
+#include <iterator>
+#include <memory>
+#include <thread>
+#include <vector>
+#include <unordered_map>
+#include <cmath>
+#include <cstring>
+#include <fcntl.h>
+#include <getopt.h>
+#include <poll.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include "Log.h"
+#include "receiver.h"
+
+using namespace std;
+
+static constexpr auto RECONNECT_DELAY = chrono::milliseconds(24);
+
+Receiver::Receiver(source_t& source, std::function<void(tagpacket_t&& tagpacket, Receiver*)> push_tagpacket, bool verbose) :
+ source(source),
+ push_tagpacket_callback(push_tagpacket),
+ edi_decoder(*this)
+{
+ edi_decoder.set_verbose(verbose);
+
+ if (source.active) {
+ etiLog.level(info) << "Connecting to TCP " << source.hostname << ":" << source.port;
+ sock.connect(source.hostname, source.port, /*nonblock*/ true);
+ }
+}
+
+void Receiver::update_fc_data(const EdiDecoder::eti_fc_data& fc_data) {
+ dlfc = fc_data.dlfc;
+}
+
+void Receiver::assemble(EdiDecoder::ReceivedTagPacket&& tag_data) {
+ tagpacket_t tp;
+ tp.hostnames = source.hostname;
+ tp.seq = tag_data.seq;
+ tp.dlfc = dlfc;
+ tp.tagpacket = move(tag_data.tagpacket);
+ tp.received_at = chrono::steady_clock::now();
+ tp.timestamp = move(tag_data.timestamp);
+ margin = tp.timestamp.to_system_clock() - chrono::system_clock::now();
+ push_tagpacket_callback(move(tp), this);
+}
+
+void Receiver::tick()
+{
+ if (source.active) {
+ if (not sock.valid()) {
+ if (reconnect_at < chrono::steady_clock::now()) {
+ sock.connect(source.hostname, source.port, /*nonblock*/ true);
+ // Mark connected = true only on successful data receive because of nonblock=true
+ reconnect_at += RECONNECT_DELAY;
+ }
+ }
+ }
+ else {
+ if (sock.valid()) {
+ etiLog.level(info) << "Disconnecting from TCP " << source.hostname << ":" << source.port;
+ sock.close();
+ source.connected = false;
+ }
+ }
+}
+int Receiver::get_margin_ms() const {
+ if (source.active) {
+ using namespace chrono;
+ return duration_cast<milliseconds>(margin).count();
+ }
+ else {
+ return 0;
+ }
+}
+
+void Receiver::receive()
+{
+ const size_t bufsize = 32;
+ vector<uint8_t> buf(bufsize);
+ bool success = false;
+ ssize_t ret = ::recv(get_sockfd(), buf.data(), buf.size(), 0);
+ if (ret == -1) {
+ if (errno == EINTR) {
+ success = false;
+ }
+ else if (errno == ECONNREFUSED) {
+ // Behave as if disconnected
+ }
+ else {
+ string errstr(strerror(errno));
+ throw runtime_error("TCP receive after poll() error: " + errstr);
+ }
+ }
+ else if (ret > 0) {
+ buf.resize(ret);
+ edi_decoder.push_bytes(buf);
+ success = true;
+ }
+ // ret == 0 means disconnected
+
+ if (not success) {
+ sock.close();
+ source.connected = false;
+ reconnect_at = chrono::steady_clock::now() + RECONNECT_DELAY;
+ }
+ else {
+ most_recent_rx_systime = chrono::system_clock::now();
+ most_recent_rx_time = chrono::steady_clock::now();
+ if (not source.connected) {
+ source.num_connects++;
+ }
+ source.connected = true;
+ }
+}
+
diff --git a/src/receiver.h b/src/receiver.h
new file mode 100644
index 0000000..7ab55a3
--- /dev/null
+++ b/src/receiver.h
@@ -0,0 +1,130 @@
+/*
+ Copyright (C) 2022
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of the ODR-mmbTools.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+#include <chrono>
+#include <iostream>
+#include <iterator>
+#include <memory>
+#include <thread>
+#include <vector>
+#include <cmath>
+#include <cstring>
+#include "Socket.h"
+#include "edi/ETIDecoder.hpp"
+
+
+struct tagpacket_t {
+ // source information
+ std::string hostnames;
+
+ uint16_t dlfc;
+ std::vector<uint8_t> tagpacket;
+ EdiDecoder::frame_timestamp_t timestamp;
+ std::chrono::steady_clock::time_point received_at;
+ EdiDecoder::seq_info_t seq;
+};
+
+struct source_t {
+ source_t(std::string hostname, int port, bool enabled) : hostname(hostname), port(port), enabled(enabled) {}
+ std::string hostname;
+ int port;
+
+ // User-controlled setting
+ bool enabled;
+
+ // Mode merging: active will be set for all enabled inputs.
+ // Mode switching: only one input will be active
+ bool active = false;
+
+ bool connected = false;
+
+ ssize_t num_connects = 0;
+};
+
+class Receiver : public EdiDecoder::ETIDataCollector {
+ public:
+ Receiver(source_t& source, std::function<void(tagpacket_t&& tagpacket, Receiver*)> push_tagpacket, bool verbose);
+ Receiver(const Receiver&) = delete;
+ Receiver operator=(const Receiver&) = delete;
+ Receiver(Receiver&&) = default;
+ Receiver& operator=(Receiver&&) = delete;
+
+ // Tell the ETIWriter what EDI protocol we receive in *ptr.
+ // This is not part of the ETI data, but is used as check
+ virtual void update_protocol(
+ const std::string& proto,
+ uint16_t major,
+ uint16_t minor) override { }
+
+ // Update the data for the frame characterisation
+ virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data) override;
+
+ // Ignore most events because we are interested in retransmitting EDI, not
+ // decoding it
+ virtual void update_fic(std::vector<uint8_t>&& fic) override { }
+ virtual void update_err(uint8_t err) override { }
+ virtual void update_edi_time(uint32_t utco, uint32_t seconds) override { }
+ virtual void update_mnsc(uint16_t mnsc) override { }
+ virtual void update_rfu(uint16_t rfu) override { }
+ virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc) override { }
+
+ // Tell the ETIWriter that the AFPacket is complete
+ virtual void assemble(EdiDecoder::ReceivedTagPacket&& tag_data) override;
+
+ // Must return -1 if the socket is not poll()able
+ int get_sockfd() const { return sock.get_sockfd(); }
+
+ void receive();
+ void tick();
+ int get_margin_ms() const;
+
+ std::chrono::system_clock::time_point get_systime_last_packet() const
+ {
+ return most_recent_rx_systime;
+ }
+
+ std::chrono::steady_clock::time_point get_time_last_packet() const
+ {
+ return most_recent_rx_time;
+ }
+
+ source_t& source;
+
+ ssize_t num_late = 0;
+
+ private:
+ std::function<void(tagpacket_t&& tagpacket, Receiver*)> push_tagpacket_callback;
+ EdiDecoder::ETIDecoder edi_decoder;
+ uint16_t dlfc = 0;
+
+ std::chrono::steady_clock::time_point reconnect_at = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point most_recent_rx_time = std::chrono::steady_clock::time_point();
+ std::chrono::system_clock::time_point most_recent_rx_systime = std::chrono::system_clock::time_point();
+
+ std::chrono::system_clock::duration margin = std::chrono::system_clock::duration::zero();
+
+
+ Socket::TCPSocket sock;
+};
+