aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 09:49:54 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 09:49:54 +0100
commit9556078164353ea1b80bebb386ae4d8ccd4c543d (patch)
tree1b61500ad55b26f47ad04babe7e062466ffc978f
parentf7f3b848a57771948bd65f632e11501a5c2ca775 (diff)
downloadodr-edilib-9556078164353ea1b80bebb386ae4d8ccd4c543d.tar.gz
odr-edilib-9556078164353ea1b80bebb386ae4d8ccd4c543d.tar.bz2
odr-edilib-9556078164353ea1b80bebb386ae4d8ccd4c543d.zip
Add threaded UDP input to reduce packet loss risk
-rw-r--r--CMakeLists.txt5
-rw-r--r--decoder/ETIDecoder.cpp2
-rw-r--r--decoder/PFT.cpp32
-rw-r--r--decoder/PFT.hpp11
-rw-r--r--test/ThreadsafeQueue.h150
-rw-r--r--test/UdpSocket.h4
-rw-r--r--test/main.cpp91
7 files changed, 254 insertions, 41 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c07381e..3515d17 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -26,6 +26,9 @@ set(VERSION_INFO_MAINT_VERSION git)
# Compiler specific setup
########################################################################
+set(THREADS_PREFER_PTHREAD_FLAG ON)
+find_package(Threads REQUIRED)
+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -W -Wall -std=c++11")
@@ -62,7 +65,7 @@ list(APPEND edilib_c_sources
set_source_files_properties(${edilib_cpp_sources} PROPERTIES LANGUAGE "CXX")
set_source_files_properties(${edilib_c_sources} PROPERTIES LANGUAGE "C")
add_executable(edilib ${edilib_cpp_sources} ${edilib_c_sources})
-target_link_libraries(edilib ${FAAD_LIBRARIES})
+target_link_libraries(edilib ${FAAD_LIBRARIES} Threads::Threads)
install(TARGETS edilib DESTINATION bin)
diff --git a/decoder/ETIDecoder.cpp b/decoder/ETIDecoder.cpp
index 939706b..78e0ea6 100644
--- a/decoder/ETIDecoder.cpp
+++ b/decoder/ETIDecoder.cpp
@@ -406,7 +406,7 @@ bool ETIDecoder::decode_estn(const vector<uint8_t> &value, uint8_t n)
return true;
}
-bool ETIDecoder::decode_stardmy(const vector<uint8_t> &value)
+bool ETIDecoder::decode_stardmy(const vector<uint8_t>& /*value*/)
{
return true;
}
diff --git a/decoder/PFT.cpp b/decoder/PFT.cpp
index 4bd1030..4348f14 100644
--- a/decoder/PFT.cpp
+++ b/decoder/PFT.cpp
@@ -445,6 +445,22 @@ std::vector<uint8_t> AFBuilder::extractAF() const
return _af_packet;
}
+std::string AFBuilder::visualise() const
+{
+ stringstream ss;
+ ss << "|";
+ for (size_t i = 0; i < _Fcount; i++) {
+ if (_fragments.count(i)) {
+ ss << ".";
+ }
+ else {
+ ss << " ";
+ }
+ }
+ ss << "| " << AFBuilder::dar_to_string(canAttemptToDecode()) << " " << lifeTime;
+ return ss.str();
+}
+
void PFT::pushPFTFrag(const Fragment &fragment)
{
// Start decoding the first pseq we receive. In normal
@@ -473,11 +489,12 @@ void PFT::pushPFTFrag(const Fragment &fragment)
p.pushPFTFrag(fragment);
#if 0
- etiLog.log(debug, "After new frag with pseq %u, afbuilders: ", fragment.Pseq());
+ etiLog.log(debug, "Got frag %u:%u, afbuilders: ",
+ fragment.Pseq(), fragment.Findex());
for (const auto &k : m_afbuilders) {
- etiLog.log(debug, "%u ", k.first);
+ const bool isNextPseq = (m_next_pseq == k.first);
+ etiLog.level(debug) << (isNextPseq ? "->" : " ") << k.first << " " << k.second.visualise();
}
- etiLog.log(debug, "\n");
#endif
}
@@ -494,21 +511,16 @@ std::vector<uint8_t> PFT::getNextAFPacket()
}
auto &builder = m_afbuilders.at(m_next_pseq);
- //const auto nf = builder.numberOfFragments();
using dar_t = AFBuilder::decode_attempt_result_t;
if (builder.canAttemptToDecode() == dar_t::yes) {
- //etiLog.log(debug, "pseq %d (%d %d/%d) yes\n",
- // m_next_pseq, lt, nf.first, nf.second);
auto afpacket = builder.extractAF();
assert(not afpacket.empty());
incrementNextPseq();
return afpacket;
}
else if (builder.canAttemptToDecode() == dar_t::maybe) {
- //etiLog.log(debug, "pseq %d (%d %d/%d) maybe\n",
- // m_next_pseq, lt, nf.first, nf.second);
if (builder.lifeTime > 0) {
builder.lifeTime--;
}
@@ -518,15 +530,13 @@ std::vector<uint8_t> PFT::getNextAFPacket()
auto afpacket = builder.extractAF();
if (afpacket.empty()) {
- etiLog.log(debug,"pseq %d timed out after RS\n", m_next_pseq);
+ etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq);
}
incrementNextPseq();
return afpacket;
}
}
else {
- //etiLog.log(debug, "pseq %d (%d %d/%d) no\n",
- // m_next_pseq, lt, nf.first, nf.second);
if (builder.lifeTime > 0) {
builder.lifeTime--;
}
diff --git a/decoder/PFT.hpp b/decoder/PFT.hpp
index 9b65d2e..1a2f617 100644
--- a/decoder/PFT.hpp
+++ b/decoder/PFT.hpp
@@ -83,6 +83,15 @@ class AFBuilder
no, // Not enough fragments present to permit RS
};
+ static std::string dar_to_string(decode_attempt_result_t dar) {
+ switch (dar) {
+ case decode_attempt_result_t::yes: return "y";
+ case decode_attempt_result_t::no: return "n";
+ case decode_attempt_result_t::maybe: return "m";
+ }
+ return "?";
+ }
+
AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime);
void pushPFTFrag(const Fragment &frag);
@@ -101,6 +110,8 @@ class AFBuilder
return {_fragments.size(), _Fcount};
}
+ std::string visualise(void) const;
+
/* The user of this instance can keep track of the lifetime of this
* builder
*/
diff --git a/test/ThreadsafeQueue.h b/test/ThreadsafeQueue.h
new file mode 100644
index 0000000..911fdc8
--- /dev/null
+++ b/test/ThreadsafeQueue.h
@@ -0,0 +1,150 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+
+/* This queue is meant to be used by two threads. One producer
+ * that pushes elements into the queue, and one consumer that
+ * retrieves the elements.
+ *
+ * The queue can make the consumer block until an element
+ * is available.
+ */
+
+template<typename T>
+class ThreadsafeQueue
+{
+public:
+ /* Push one element into the queue, and notify another thread that
+ * might be waiting.
+ *
+ * returns the new queue size.
+ */
+ size_t push(T const& val)
+ {
+ size_t queue_size = 0;
+ {
+ std::lock_guard<std::mutex> lock(the_mutex);
+ the_queue.push(val);
+ queue_size = the_queue.size();
+ }
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ size_t queue_size = 0;
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ queue_size = the_queue.size();
+ }
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
+ {
+ the_rx_notification.notify_one();
+ }
+
+ bool empty() const
+ {
+ std::lock_guard<std::mutex> lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ size_t size() const
+ {
+ std::lock_guard<std::mutex> lock(the_mutex);
+ return the_queue.size();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ {
+ std::lock_guard<std::mutex> lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ }
+ the_tx_notification.notify_one();
+
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
+ {
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() < prebuffering) {
+ the_rx_notification.wait(lock);
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ }
+ the_tx_notification.notify_one();
+ }
+
+private:
+ std::queue<T> the_queue;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
+};
+
+
diff --git a/test/UdpSocket.h b/test/UdpSocket.h
index f62db1d..ba8f383 100644
--- a/test/UdpSocket.h
+++ b/test/UdpSocket.h
@@ -140,10 +140,6 @@ class UdpPacket
*/
UdpPacket();
UdpPacket(size_t initSize);
- UdpPacket(const UdpPacket& packet) = delete;
- const UdpPacket& operator=(const UdpPacket&) = delete;
- UdpPacket(const UdpPacket&& packet) = delete;
- const UdpPacket& operator=(const UdpPacket&&) = delete;
/** Give the pointer to data.
* @return The pointer
diff --git a/test/main.cpp b/test/main.cpp
index b051429..913622b 100644
--- a/test/main.cpp
+++ b/test/main.cpp
@@ -25,10 +25,13 @@
#include <cstdlib>
#include <string>
#include <regex>
+#include <thread>
+#include <atomic>
#include "ETIDecoder.hpp"
#include "ETIWriter.hpp"
#include "UdpSocket.h"
+#include "ThreadsafeQueue.h"
struct options_t {
std::string edi_source;
@@ -40,6 +43,57 @@ struct options_t {
int max_delay;
};
+class UdpReceiver {
+ public:
+ UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
+ ~UdpReceiver() {
+ m_stop = true;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+ }
+
+ void start(int port) {
+ m_port = port;
+ m_thread = std::thread(&UdpReceiver::m_run, this);
+ }
+
+ std::vector<uint8_t> get_packet_buffer(void) {
+ UdpPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.getBuffer();
+ }
+
+ private:
+ void m_run(void) {
+ UdpSocket sock(m_port);
+
+ const size_t packsize = 8192;
+ UdpPacket packet(packsize);
+
+ while (not m_stop) {
+ int ret = sock.receive(packet);
+ if (ret == 0) {
+ if (packet.getSize() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ m_packets.push(packet);
+ }
+ else
+ {
+ fprintf(stderr, "Socket error: %s\n", inetErrMsg);
+ m_stop = true;
+ }
+ }
+ }
+
+ int m_port;
+ std::thread m_thread;
+ std::atomic<bool> m_stop;
+ ThreadsafeQueue<UdpPacket> m_packets;
+};
+
static void printUsage(char *name)
{
fprintf(stderr, "Usage:\n %s <edi_source> [-o eti_file] [-e inv_rate] [-m delay]\n", name);
@@ -121,38 +175,27 @@ int main(int argc, char **argv)
const int udp_port = std::stoi(m[1].str());
fprintf(stderr, "Receiving from udp port %d\n", udp_port);
- UdpSocket sock(udp_port);
+ UdpReceiver rx;
+ rx.start(udp_port);
- const size_t packsize = 8192;
- UdpPacket packet(packsize);
while (true) {
- int ret = sock.receive(packet);
- if (ret == 0) {
- const auto &buf = packet.getBuffer();
- if (packet.getSize() == packsize) {
- fprintf(stderr, "Warning, possible UDP truncation\n");
- }
+ const auto &buf = rx.get_packet_buffer();
- if (options.enable_packet_loss) {
- int rand_num = rand() % options.packet_loss_inv_rate;
- if (rand_num != 0) {
- decoder.push_packet(buf);
- }
- }
- else {
+ if (options.enable_packet_loss) {
+ int rand_num = rand() % options.packet_loss_inv_rate;
+ if (rand_num != 0) {
decoder.push_packet(buf);
}
-
- const auto frame = writer.getEtiFrame();
- if (fd_eti and not frame.empty()) {
- fwrite(frame.data(), frame.size(), 1, fd_eti);
- }
-
}
else {
- fprintf(stderr, "Socket error: %s\n", inetErrMsg);
- break;
+ decoder.push_packet(buf);
}
+
+ const auto frame = writer.getEtiFrame();
+ if (fd_eti and not frame.empty()) {
+ fwrite(frame.data(), frame.size(), 1, fd_eti);
+ }
+
}
}
else {