aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2020-05-06 15:24:49 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2020-05-06 15:24:49 +0200
commit44e414f0aad8e98e5187baf190412e164ff9942e (patch)
tree08e37375436daff36d4462fd85f14f869dcf4cec /src
parentd633c266b24f4eada1ed0cf8821b572980ae1b2f (diff)
downloadODR-EDI2EDI-44e414f0aad8e98e5187baf190412e164ff9942e.tar.gz
ODR-EDI2EDI-44e414f0aad8e98e5187baf190412e164ff9942e.tar.bz2
ODR-EDI2EDI-44e414f0aad8e98e5187baf190412e164ff9942e.zip
Add and integrate EDISender from zmq2edi
Diffstat (limited to 'src')
-rw-r--r--src/EDISender.cpp180
-rw-r--r--src/EDISender.h79
-rw-r--r--src/main.cpp20
3 files changed, 270 insertions, 9 deletions
diff --git a/src/EDISender.cpp b/src/EDISender.cpp
new file mode 100644
index 0000000..e5f0a50
--- /dev/null
+++ b/src/EDISender.cpp
@@ -0,0 +1,180 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2020
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of the ODR-mmbTools.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "EDISender.h"
+#include "Log.h"
+#include "ThreadsafeQueue.h"
+#include <cmath>
+#include <cstring>
+#include <numeric>
+#include <map>
+#include <algorithm>
+#include <limits>
+
+using namespace std;
+
+EDISender::~EDISender()
+{
+ if (running.load()) {
+ running.store(false);
+ tagpackets.trigger_wakeup();
+ process_thread.join();
+ }
+}
+
+void EDISender::start(const edi::configuration_t& conf, int delay_ms, bool drop_late_packets)
+{
+ edi_conf = conf;
+ tist_delay_ms = delay_ms;
+ drop_late = drop_late_packets;
+
+ edi_sender = make_shared<edi::Sender>(edi_conf);
+
+ running.store(true);
+ process_thread = thread(&EDISender::process, this);
+}
+
+void EDISender::push_tagpacket(tagpacket_t&& tp)
+{
+ tagpackets.push(move(tp));
+}
+
+void EDISender::print_configuration()
+{
+ if (edi_conf.enabled()) {
+ edi_conf.print();
+ }
+ else {
+ etiLog.level(info) << "EDI disabled";
+ }
+}
+
+void EDISender::send_tagpacket(tagpacket_t& tp)
+{
+ // Wait until our time is tist_delay after the TIST before
+ // we release that frame
+
+ using namespace std::chrono;
+
+ const auto t_frame = tp.timestamp.to_system_clock();
+ const auto t_release = t_frame + milliseconds(tist_delay_ms);
+ const auto t_now = system_clock::now();
+
+ const bool late = t_release < t_now;
+
+ buffering_stat_t stat;
+ stat.late = late;
+
+ if (not late) {
+ const auto wait_time = t_release - t_now;
+ std::this_thread::sleep_for(wait_time);
+ }
+
+ stat.buffering_time_us = duration_cast<microseconds>(steady_clock::now() - tp.received_at).count();
+ buffering_stats.push_back(std::move(stat));
+
+ if (late and drop_late) {
+ return;
+ }
+
+ if (edi_sender and edi_conf.enabled()) {
+ edi::TagPacket edi_tagpacket(0);
+ edi_tagpacket.raw_tagpacket = move(tp.tagpacket);
+ edi_sender->write(edi_tagpacket);
+ }
+}
+
+void EDISender::process()
+{
+ while (running.load()) {
+ tagpacket_t tagpacket;
+ try {
+ tagpackets.wait_and_pop(tagpacket);
+ }
+ catch (const ThreadsafeQueueWakeup&) {
+ break;
+ }
+
+ if (not running.load()) {
+ break;
+ }
+
+ send_tagpacket(tagpacket);
+
+ if (buffering_stats.size() == 250) { // every six seconds
+ const double n = buffering_stats.size();
+
+ size_t num_late = std::count_if(buffering_stats.begin(), buffering_stats.end(),
+ [](const buffering_stat_t& s){ return s.late; });
+
+ double sum = 0.0;
+ double min = std::numeric_limits<double>::max();
+ double max = -std::numeric_limits<double>::max();
+ for (const auto& s : buffering_stats) {
+ // convert to milliseconds
+ const double t = s.buffering_time_us / 1000.0;
+ sum += t;
+
+ if (t < min) {
+ min = t;
+ }
+
+ if (t > max) {
+ max = t;
+ }
+ }
+ double mean = sum / n;
+
+ double sq_sum = 0;
+ for (const auto& s : buffering_stats) {
+ const double t = s.buffering_time_us / 1000.0;
+ sq_sum += (t-mean) * (t-mean);
+ }
+ double stdev = sqrt(sq_sum / n);
+
+ /* Debug code
+ stringstream ss;
+ ss << "times:";
+ for (const auto t : buffering_stats) {
+ ss << " " << lrint(t.buffering_time_us / 1000.0);
+ }
+ etiLog.level(debug) << ss.str();
+ // */
+
+ etiLog.level(info) << "Buffering time statistics [milliseconds]:"
+ " min: " << min <<
+ " max: " << max <<
+ " mean: " << mean <<
+ " stdev: " << stdev <<
+ " late: " <<
+ num_late << " of " << buffering_stats.size() << " (" <<
+ num_late * 100.0 / n << "%)";
+
+ buffering_stats.clear();
+ }
+ }
+}
diff --git a/src/EDISender.h b/src/EDISender.h
new file mode 100644
index 0000000..5f053ea
--- /dev/null
+++ b/src/EDISender.h
@@ -0,0 +1,79 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2020
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of the ODR-mmbTools.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+#include <iostream>
+#include <iterator>
+#include <thread>
+#include <vector>
+#include <chrono>
+#include <atomic>
+#include "ThreadsafeQueue.h"
+#include "edioutput/TagItems.h"
+#include "edioutput/TagPacket.h"
+#include "edioutput/Transport.h"
+#include "edi/common.hpp"
+
+
+struct tagpacket_t {
+ std::vector<uint8_t> tagpacket;
+ EdiDecoder::frame_timestamp_t timestamp;
+ std::chrono::steady_clock::time_point received_at;
+};
+
+class EDISender {
+ public:
+ EDISender() = default;
+ EDISender(const EDISender& other) = delete;
+ EDISender& operator=(const EDISender& other) = delete;
+ ~EDISender();
+ void start(const edi::configuration_t& conf,
+ int delay_ms, bool drop_late_packets);
+ void push_tagpacket(tagpacket_t&& tagpacket);
+ void print_configuration(void);
+
+ private:
+ void send_tagpacket(tagpacket_t& frame);
+ void process(void);
+
+ int tist_delay_ms;
+ bool drop_late;
+ std::atomic<bool> running;
+ std::thread process_thread;
+ edi::configuration_t edi_conf;
+ ThreadsafeQueue<tagpacket_t> tagpackets;
+
+ std::shared_ptr<edi::Sender> edi_sender;
+
+ struct buffering_stat_t {
+ // Time between when we received the packets and when we transmit packets, in microseconds
+ double buffering_time_us = 0.0;
+ bool late = false;
+ };
+ std::vector<buffering_stat_t> buffering_stats;
+
+};
diff --git a/src/main.cpp b/src/main.cpp
index 9dc6011..43cae90 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -5,6 +5,8 @@
http://www.opendigitalradio.org
*/
/*
+ This file is part of the ODR-mmbTools.
+
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
@@ -29,6 +31,7 @@
#include <thread>
#include <vector>
#include "Log.h"
+#include "EDISender.h"
#include "edioutput/TagItems.h"
#include "edioutput/TagPacket.h"
#include "edioutput/Transport.h"
@@ -112,12 +115,11 @@ class Main : public EdiDecoder::ETIDataCollector {
// Tell the ETIWriter that the AFPacket is complete
virtual void assemble(EdiDecoder::ReceivedTagPacket&& tag_data) override
{
- etiLog.level(info) << "Received tagpacket " << tag_data.tagpacket.size() << " bytes at " <<
- tag_data.timestamp.to_string();
-
- edi::TagPacket edi_tagpacket(0);
- edi_tagpacket.raw_tagpacket = move(tag_data.tagpacket);
- edi_sender->write(edi_tagpacket);
+ tagpacket_t tp;
+ tp.tagpacket = move(tag_data.tagpacket);
+ tp.received_at = std::chrono::steady_clock::now();
+ tp.timestamp = move(tag_data.timestamp);
+ edisender.push_tagpacket(move(tp));
}
int start(int argc, char **argv)
@@ -243,13 +245,13 @@ class Main : public EdiDecoder::ETIDataCollector {
return 1;
}
- edi_sender = make_shared<edi::Sender>(edi_conf);
-
edi_decoder.set_verbose(edi_conf.verbose);
etiLog.level(info) << "Setting up EDI2EDI with delay " << delay_ms << " ms. " <<
(drop_late_packets ? "Will" : "Will not") << " drop late packets";
+ edisender.start(edi_conf, delay_ms, drop_late_packets);
+ edisender.print_configuration();
Socket::TCPSocket sock;
etiLog.level(info) << "Connecting to TCP " << connect_to_host << ":" << connect_to_port;
@@ -341,7 +343,7 @@ class Main : public EdiDecoder::ETIDataCollector {
std::string startupcheck;
std::string source;
- std::shared_ptr<edi::Sender> edi_sender;
+ EDISender edisender;
};