aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/Socket.cpp122
-rw-r--r--lib/Socket.h1
-rw-r--r--lib/edi/PFT.cpp8
-rw-r--r--lib/edi/common.cpp93
-rw-r--r--lib/edi/common.hpp19
-rw-r--r--src/EDISender.cpp106
-rw-r--r--src/EDISender.h29
-rw-r--r--src/main.cpp21
8 files changed, 290 insertions, 109 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index c876f32..d12c970 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -409,6 +409,121 @@ bool TCPSocket::valid() const
return m_sock != -1;
}
+void TCPSocket::connect(const std::string& hostname, int port, int timeout_ms)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only connect an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ /* Obtain address(es) matching host/port */
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(hostname.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ int flags = 0;
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully connect(2).
+ If socket(2) (or connect(2)) fails, we (close the socket
+ and) try the next address. */
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype,
+ rp->ai_protocol);
+ if (sfd == -1)
+ continue;
+
+ flags = fcntl(sfd, F_GETFL);
+ if (flags == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not get socket flags: " + errstr);
+ }
+
+ if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+
+ int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
+ if (ret == 0) {
+ m_sock = sfd;
+ break;
+ }
+ if (ret == -1 and errno == EINPROGRESS) {
+ m_sock = sfd;
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLOUT;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ std::string errstr(strerror(errno));
+ ::close(m_sock);
+ freeaddrinfo(result);
+ throw runtime_error("TCP: connect error on poll: " + errstr);
+ }
+ else if (retval > 0) {
+ int so_error = 0;
+ socklen_t len = sizeof(so_error);
+
+ if (getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &so_error, &len) == -1) {
+ std::string errstr(strerror(errno));
+ ::close(m_sock);
+ freeaddrinfo(result);
+ throw runtime_error("TCP: getsockopt error connect: " + errstr);
+ }
+
+ if (so_error == 0) {
+ break;
+ }
+ }
+ else {
+ ::close(m_sock);
+ freeaddrinfo(result);
+ throw runtime_error("Timeout on connect");
+ }
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
+ == SOCKET_ERROR) {
+ throw runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+ }
+
+ // Don't keep the socket blocking
+ if (fcntl(m_sock, F_SETFL, flags) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not connect");
+ }
+}
+
void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
{
if (m_sock != INVALID_SOCKET) {
@@ -447,11 +562,15 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
int flags = fcntl(sfd, F_GETFL);
if (flags == -1) {
std::string errstr(strerror(errno));
+ freeaddrinfo(result);
+ ::close(sfd);
throw std::runtime_error("TCP: Could not get socket flags: " + errstr);
}
if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
std::string errstr(strerror(errno));
+ freeaddrinfo(result);
+ ::close(sfd);
throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
}
}
@@ -480,7 +599,6 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
if (rp == nullptr) {
throw runtime_error("Could not connect");
}
-
}
void TCPSocket::listen(int port, const string& name)
@@ -936,10 +1054,12 @@ void TCPReceiveServer::process()
sock.close();
// TODO replace fprintf
fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what());
+ m_queue.push(make_shared<TCPReceiveMessageDisconnected>());
}
if (num_timeouts > max_num_timeouts) {
sock.close();
+ m_queue.push(make_shared<TCPReceiveMessageDisconnected>());
}
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index 33cdc05..08607a5 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -168,6 +168,7 @@ class TCPSocket {
bool valid(void) const;
void connect(const std::string& hostname, int port, bool nonblock = false);
+ void connect(const std::string& hostname, int port, int timeout_ms);
void listen(int port, const std::string& name);
void close(void);
diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp
index 25f2d1f..2877aa5 100644
--- a/lib/edi/PFT.cpp
+++ b/lib/edi/PFT.cpp
@@ -453,11 +453,11 @@ std::vector<uint8_t> AFBuilder::extractAF()
}
// EDI specific, must have a CRC.
- if( _af_packet.size() >= 12 ) {
+ if (_af_packet.size() >= 12) {
ok = checkCRC(_af_packet.data(), _af_packet.size());
if (not ok) {
- etiLog.log(debug, "Too many errors to reconstruct AF from %zu/%u"
+ etiLog.log(debug, "CRC error after AF reconstruction from %zu/%u"
" PFT fragments\n", _fragments.size(), _Fcount);
}
}
@@ -570,7 +570,7 @@ afpacket_pft_t PFT::getNextAFPacket()
if (builder.canAttemptToDecode() == dar_t::yes) {
auto afpacket = builder.extractAF();
- assert(not afpacket.empty());
+ // Empty AF Packet can happen if CRC is wrong
if (m_verbose) {
etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins();
}
@@ -588,7 +588,7 @@ afpacket_pft_t PFT::getNextAFPacket()
auto afpacket = builder.extractAF();
if (afpacket.empty()) {
- etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq);
+ etiLog.log(debug, "pseq %d timed out after RS", m_next_pseq);
}
if (m_verbose) {
etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins();
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index 2f20391..c99997a 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -153,24 +153,33 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
while (m_input_data.size() > 2) {
if (m_input_data[0] == 'A' and m_input_data[1] == 'F') {
- const decode_state_t st = decode_afpacket(m_input_data);
-
- if (st.num_bytes_consumed == 0 and not st.complete) {
- // We need to refill our buffer
- break;
+ const auto r = decode_afpacket(m_input_data);
+ bool leave_loop = false;
+ switch (r.st) {
+ case decode_state_e::Ok:
+ m_last_sequences.pseq_valid = false;
+ m_af_packet_completed();
+ break;
+ case decode_state_e::MissingData:
+ /* Continue filling buffer */
+ leave_loop = true;
+ break;
+ case decode_state_e::Error:
+ m_last_sequences.pseq_valid = false;
+ leave_loop = true;
+ break;
}
- if (st.num_bytes_consumed) {
+ if (r.num_bytes_consumed) {
vector<uint8_t> remaining_data;
- copy(m_input_data.begin() + st.num_bytes_consumed,
+ copy(m_input_data.begin() + r.num_bytes_consumed,
m_input_data.end(),
back_inserter(remaining_data));
m_input_data = remaining_data;
}
- m_last_sequences.pseq_valid = false;
- if (st.complete) {
- m_af_packet_completed();
+ if (leave_loop) {
+ break;
}
}
else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') {
@@ -194,12 +203,21 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
auto af = m_pft.getNextAFPacket();
if (not af.af_packet.empty()) {
- const decode_state_t st = decode_afpacket(af.af_packet);
- m_last_sequences.pseq = af.pseq;
- m_last_sequences.pseq_valid = true;
-
- if (st.complete) {
- m_af_packet_completed();
+ const auto r = decode_afpacket(af.af_packet);
+
+ switch (r.st) {
+ case decode_state_e::Ok:
+ m_last_sequences.pseq = af.pseq;
+ m_last_sequences.pseq_valid = true;
+ m_af_packet_completed();
+ break;
+ case decode_state_e::MissingData:
+ etiLog.level(error) << "ETI MissingData on PFT push_bytes";
+ m_last_sequences.pseq_valid = false;
+ break;
+ case decode_state_e::Error:
+ m_last_sequences.pseq_valid = false;
+ break;
}
}
}
@@ -219,10 +237,10 @@ void TagDispatcher::push_packet(const Packet &packet)
}
if (buf[0] == 'A' and buf[1] == 'F') {
- const decode_state_t st = decode_afpacket(buf);
+ const auto r = decode_afpacket(buf);
m_last_sequences.pseq_valid = false;
- if (st.complete) {
+ if (r.st == decode_state_e::Ok) {
m_af_packet_completed();
}
@@ -237,11 +255,11 @@ void TagDispatcher::push_packet(const Packet &packet)
auto af = m_pft.getNextAFPacket();
if (not af.af_packet.empty()) {
- const decode_state_t st = decode_afpacket(af.af_packet);
- m_last_sequences.pseq = af.pseq;
- m_last_sequences.pseq_valid = true;
+ const auto r = decode_afpacket(af.af_packet);
- if (st.complete) {
+ if (r.st == decode_state_e::Ok) {
+ m_last_sequences.pseq = af.pseq;
+ m_last_sequences.pseq_valid = true;
m_af_packet_completed();
}
}
@@ -261,11 +279,11 @@ void TagDispatcher::setMaxDelay(int num_af_packets)
#define AFPACKET_HEADER_LEN 10 // includes SYNC
-decode_state_t TagDispatcher::decode_afpacket(
+TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(
const std::vector<uint8_t> &input_data)
{
if (input_data.size() < AFPACKET_HEADER_LEN) {
- return {false, 0};
+ return {decode_state_e::MissingData, 0};
}
// read length from packet
@@ -274,7 +292,7 @@ decode_state_t TagDispatcher::decode_afpacket(
const size_t crclength = 2;
if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) {
- return {false, 0};
+ return {decode_state_e::MissingData, 0};
}
// SEQ wraps at 0xFFFF, unsigned integer overflow is intentional
@@ -291,22 +309,23 @@ decode_state_t TagDispatcher::decode_afpacket(
}
m_last_sequences.seq = seq;
+ const size_t crclen = 2;
bool has_crc = (input_data[8] & 0x80) ? true : false;
uint8_t major_revision = (input_data[8] & 0x70) >> 4;
uint8_t minor_revision = input_data[8] & 0x0F;
if (major_revision != 1 or minor_revision != 0) {
- throw invalid_argument("EDI AF Packet has wrong revision " +
- to_string(major_revision) + "." + to_string(minor_revision));
+ etiLog.level(warn) << "EDI AF Packet has wrong revision " <<
+ (int)major_revision << "." << (int)minor_revision;
+ }
+
+ if (not has_crc) {
+ etiLog.level(warn) << "AF packet not supported, has no CRC";
+ return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength};
}
uint8_t pt = input_data[9];
if (pt != 'T') {
// only support Tag
- return {false, 0};
- }
-
-
- if (not has_crc) {
- throw invalid_argument("AF packet not supported, has no CRC");
+ return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};
}
uint16_t crc = 0xffff;
@@ -318,7 +337,8 @@ decode_state_t TagDispatcher::decode_afpacket(
uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);
if (packet_crc != crc) {
- throw invalid_argument("AF Packet crc wrong");
+ etiLog.level(warn) << "AF Packet crc wrong";
+ return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};
}
else {
vector<uint8_t> payload(taglength);
@@ -326,8 +346,9 @@ decode_state_t TagDispatcher::decode_afpacket(
input_data.begin() + AFPACKET_HEADER_LEN + taglength,
payload.begin());
- return {decode_tagpacket(payload),
- AFPACKET_HEADER_LEN + taglength + 2};
+ return {
+ decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error,
+ AFPACKET_HEADER_LEN + taglength + crclen};
}
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index e8c57c1..5e31984 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -49,13 +49,6 @@ struct frame_timestamp_t {
static frame_timestamp_t from_unix_epoch(std::time_t time, uint32_t tai_utc_offset, uint32_t tsta);
};
-struct decode_state_t {
- decode_state_t(bool _complete, size_t _num_bytes_consumed) :
- complete(_complete), num_bytes_consumed(_num_bytes_consumed) {}
- bool complete;
- size_t num_bytes_consumed;
-};
-
using tag_name_t = std::array<uint8_t, 4>;
std::string tag_name_to_human_readable(const tag_name_t& name);
@@ -122,7 +115,17 @@ class TagDispatcher {
}
private:
- decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data);
+ enum class decode_state_e {
+ Ok, MissingData, Error
+ };
+ struct decode_result_t {
+ decode_result_t(decode_state_e _st, size_t _num_bytes_consumed) :
+ st(_st), num_bytes_consumed(_num_bytes_consumed) {}
+ decode_state_e st;
+ size_t num_bytes_consumed;
+ };
+
+ decode_result_t decode_afpacket(const std::vector<uint8_t> &input_data);
bool decode_tagpacket(const std::vector<uint8_t> &payload);
PFT::PFT m_pft;
diff --git a/src/EDISender.cpp b/src/EDISender.cpp
index 30c7289..171f09a 100644
--- a/src/EDISender.cpp
+++ b/src/EDISender.cpp
@@ -30,6 +30,7 @@
#include "ThreadsafeQueue.h"
#include <cmath>
#include <cstring>
+#include <iomanip>
#include <numeric>
#include <map>
#include <algorithm>
@@ -39,41 +40,51 @@ using namespace std;
EDISender::~EDISender()
{
- running.store(false);
- tagpackets.trigger_wakeup();
+ _running.store(false);
+ _tagpackets.trigger_wakeup();
- if (process_thread.joinable()) {
- process_thread.join();
+ if (_process_thread.joinable()) {
+ _process_thread.join();
}
}
-void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets)
+void EDISender::start(
+ const edi::configuration_t& conf,
+ int delay_ms,
+ bool drop_late,
+ int drop_delay_ms)
{
- edi_conf = conf;
- tist_delay_ms = delay_ms;
- drop_late = drop_late_packets;
+ _edi_conf = conf;
+ _delay_ms = delay_ms;
+ _drop_late = drop_late;
+ _drop_delay_ms = drop_delay_ms;
- edi_sender = make_shared<edi::Sender>(edi_conf);
+ _edi_sender = make_shared<edi::Sender>(_edi_conf);
- running.store(true);
- process_thread = thread(&EDISender::process, this);
+ _running.store(true);
+ _process_thread = thread(&EDISender::process, this);
}
void EDISender::push_tagpacket(tagpacket_t&& tp)
{
- tagpackets.push(move(tp));
+ _tagpackets.push(move(tp));
}
void EDISender::print_configuration()
{
- if (edi_conf.enabled()) {
- edi_conf.print();
+ if (_edi_conf.enabled()) {
+ _edi_conf.print();
}
else {
etiLog.level(info) << "EDI disabled";
}
}
+void EDISender::inhibit_until(std::chrono::steady_clock::time_point tp)
+{
+ _output_inhibit_until = tp;
+}
+
void EDISender::send_tagpacket(tagpacket_t& tp)
{
// Wait until our time is tist_delay after the TIST before
@@ -82,59 +93,68 @@ void EDISender::send_tagpacket(tagpacket_t& tp)
using namespace std::chrono;
const auto t_frame = tp.timestamp.to_system_clock();
- const auto t_release = t_frame + milliseconds(tist_delay_ms);
+ const auto t_release = t_frame + milliseconds(_delay_ms);
+ const auto t_latest_release = t_frame + milliseconds(_drop_delay_ms);
const auto t_now = system_clock::now();
- const bool late = t_release < t_now;
+ const bool slightly_late = t_release < t_now;
+ const bool late = t_latest_release < t_now;
buffering_stat_t stat;
- stat.late = late;
+ stat.late = slightly_late;
- if (not late) {
+ if (not slightly_late) {
const auto wait_time = t_release - t_now;
std::this_thread::sleep_for(wait_time);
}
- stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - tp.received_at).count();
- buffering_stats.push_back(std::move(stat));
+ const auto t_now_steady = steady_clock::now();
+ stat.inhibited = t_now_steady < _output_inhibit_until;
+ stat.dropped = late and _drop_late;
+ stat.buffering_time_us = duration_cast<microseconds>(t_now_steady - tp.received_at).count();
+ _buffering_stats.push_back(std::move(stat));
- if (late and drop_late) {
+ if (late and _drop_late) {
return;
}
- if (edi_sender and edi_conf.enabled()) {
+ if (stat.inhibited) {
+ return;
+ }
+
+ if (_edi_sender and _edi_conf.enabled()) {
edi::TagPacket edi_tagpacket(0);
if (tp.seq.seq_valid) {
- edi_sender->override_af_sequence(tp.seq.seq);
+ _edi_sender->override_af_sequence(tp.seq.seq);
}
if (tp.seq.pseq_valid) {
- edi_sender->override_pft_sequence(tp.seq.pseq);
+ _edi_sender->override_pft_sequence(tp.seq.pseq);
}
else if (tp.seq.seq_valid) {
// If the source isn't using PFT, set PSEQ = SEQ so that multihoming
// with several EDI2EDI instances could work.
- edi_sender->override_pft_sequence(tp.seq.seq);
+ _edi_sender->override_pft_sequence(tp.seq.seq);
}
edi_tagpacket.raw_tagpacket = move(tp.tagpacket);
- edi_sender->write(edi_tagpacket);
+ _edi_sender->write(edi_tagpacket);
}
}
void EDISender::process()
{
- while (running.load()) {
+ while (_running.load()) {
tagpacket_t tagpacket;
try {
- tagpackets.wait_and_pop(tagpacket);
+ _tagpackets.wait_and_pop(tagpacket);
}
catch (const ThreadsafeQueueWakeup&) {
break;
}
- if (not running.load()) {
+ if (not _running.load()) {
break;
}
@@ -143,15 +163,21 @@ void EDISender::process()
send_tagpacket(tagpacket);
if (dlfc % 250 == 0) { // every six seconds
- const double n = buffering_stats.size();
+ const double n = _buffering_stats.size();
- size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(),
+ size_t num_late = std::count_if(_buffering_stats.begin(), _buffering_stats.end(),
[](const buffering_stat_t& s){ return s.late; });
+ size_t num_dropped = std::count_if(_buffering_stats.begin(), _buffering_stats.end(),
+ [](const buffering_stat_t& s){ return s.dropped; });
+
+ size_t num_inhibited = std::count_if(_buffering_stats.begin(), _buffering_stats.end(),
+ [](const buffering_stat_t& s){ return s.inhibited; });
+
double sum = 0.0;
double min = std::numeric_limits<double>::max();
double max = -std::numeric_limits<double>::max();
- for (const auto& s : buffering_stats) {
+ for (const auto& s : _buffering_stats) {
// convert to milliseconds
const double t = s.buffering_time_us / 1000.0;
sum += t;
@@ -167,7 +193,7 @@ void EDISender::process()
double mean = sum / n;
double sq_sum = 0;
- for (const auto& s : buffering_stats) {
+ for (const auto& s : _buffering_stats) {
const double t = s.buffering_time_us / 1000.0;
sq_sum += (t-mean) * (t-mean);
}
@@ -176,24 +202,24 @@ void EDISender::process()
/* Debug code
stringstream ss;
ss << "times:";
- for (const auto t : buffering_stats) {
+ for (const auto t : _buffering_stats) {
ss << " " << lrint(t.buffering_time_us / 1000.0);
}
etiLog.level(debug) << ss.str();
// */
- etiLog.level(info) << "Buffering time statistics [milliseconds]:"
+ etiLog.level(info) << "Buffering time statistics for " <<
+ _buffering_stats.size() << " frames [milliseconds]:" <<
" min: " << min <<
" max: " << max <<
" mean: " << mean <<
" stdev: " << stdev <<
- " late: " <<
- num_late << " of " << buffering_stats.size() << " (" <<
- num_late * 100.0 / n << "%)" <<
+ " late: " << num_late <<
+ " dropped: " << num_dropped <<
+ " inhibited: " << num_inhibited <<
" Frame 0 TS " << ((double)tsta / 16384.0);
-
- buffering_stats.clear();
+ _buffering_stats.clear();
}
}
}
diff --git a/src/EDISender.h b/src/EDISender.h
index f5a14b2..91478f5 100644
--- a/src/EDISender.h
+++ b/src/EDISender.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2021
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -54,28 +54,37 @@ class EDISender {
EDISender& operator=(const EDISender& other) = delete;
~EDISender();
void start(const edi::configuration_t& conf,
- int delay_ms, bool drop_late_packets);
+ int delay_ms,
+ bool drop_late,
+ int drop_delay_ms);
void push_tagpacket(tagpacket_t&& tagpacket);
void print_configuration(void);
+ void inhibit_until(std::chrono::steady_clock::time_point tp);
+
private:
void send_tagpacket(tagpacket_t& frame);
void process(void);
- int tist_delay_ms;
- bool drop_late;
- std::atomic<bool> running;
- std::thread process_thread;
- edi::configuration_t edi_conf;
- ThreadsafeQueue<tagpacket_t> tagpackets;
+ std::chrono::steady_clock::time_point _output_inhibit_until = std::chrono::steady_clock::now();
+
+ edi::configuration_t _edi_conf;
+ int _delay_ms;
+ bool _drop_late;
+ int _drop_delay_ms;
+ std::atomic<bool> _running;
+ std::thread _process_thread;
+ ThreadsafeQueue<tagpacket_t> _tagpackets;
- std::shared_ptr<edi::Sender> edi_sender;
+ std::shared_ptr<edi::Sender> _edi_sender;
struct buffering_stat_t {
// Time between when we received the packets and when we transmit packets, in microseconds
double buffering_time_us = 0.0;
bool late = false;
+ bool dropped = false;
+ bool inhibited = false;
};
- std::vector<buffering_stat_t> buffering_stats;
+ std::vector<buffering_stat_t> _buffering_stats;
};
diff --git a/src/main.cpp b/src/main.cpp
index 6714052..f30f460 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2020
+ Copyright (C) 2021
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -64,9 +64,9 @@ static void usage()
cerr << " -c <host:port> Connect to given host and port using TCP.\n";
cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time.\n";
cerr << " Negative delay values are also allowed.\n";
+ cerr << " -x <drop_delay> Drop frames where for which are too late, defined by the drop delay.\n";
cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0.\n";
cerr << " This is useful for checking that NTP is properly synchronised\n";
- cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late.\n";
cerr << " -P Disable PFT and send AFPackets.\n";
cerr << " -f <fec> Set the FEC.\n";
cerr << " -i <interleave> Configure the interleaver with given interleave percentage: 0% send all fragments at once, 100% spread over 24ms, >100% spread and interleave. Default 95%\n";
@@ -135,7 +135,7 @@ class Main : public EdiDecoder::ETIDataCollector {
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "c:C:d:p:s:S:t:Pf:i:Dva:b:w:xh");
+ ch = getopt(argc, argv, "c:C:d:p:s:S:t:Pf:i:Dva:b:w:x:h");
switch (ch) {
case -1:
break;
@@ -180,13 +180,14 @@ class Main : public EdiDecoder::ETIDataCollector {
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
case 'b':
- backoff_after_reset_ms = std::stoi(optarg);
+ backoff = std::chrono::milliseconds(std::stoi(optarg));
break;
case 'w':
delay_ms = std::stoi(optarg);
break;
case 'x':
drop_late_packets = true;
+ drop_delay_ms = std::stoi(optarg);
break;
case 'h':
default:
@@ -236,9 +237,9 @@ class Main : public EdiDecoder::ETIDataCollector {
}
etiLog.level(info) << "Setting up EDI2EDI with delay " << delay_ms << " ms. " <<
- (drop_late_packets ? "Will" : "Will not") << " drop late packets";
+ (drop_late_packets ? "Will" : "Will not") << " drop late packets (" << drop_delay_ms << " ms)";
- edisender.start(edi_conf, delay_ms, drop_late_packets);
+ edisender.start(edi_conf, delay_ms, drop_late_packets, drop_delay_ms);
edisender.print_configuration();
try {
@@ -250,12 +251,11 @@ class Main : public EdiDecoder::ETIDataCollector {
if (not running) {
break;
}
- etiLog.level(info) << "Source disconnected, backoff " << backoff_after_reset_ms << "ms...";
+ etiLog.level(info) << "Source disconnected, reconnecting and enabling output inhibit backoff";
+ edisender.inhibit_until(chrono::steady_clock::now() + backoff);
// There is no state inside the edisender or inside Main that we need to
// clear.
-
- this_thread::sleep_for(chrono::milliseconds(backoff_after_reset_ms));
}
}
catch (const std::runtime_error& e) {
@@ -372,7 +372,8 @@ class Main : public EdiDecoder::ETIDataCollector {
edi::configuration_t edi_conf;
int delay_ms = 500;
bool drop_late_packets = false;
- uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
+ int drop_delay_ms = 0;
+ std::chrono::steady_clock::duration backoff = std::chrono::milliseconds(DEFAULT_BACKOFF);
std::string startupcheck;
std::string source;