aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2020-10-28 11:35:37 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2020-10-28 11:35:37 +0100
commitab1f13094dd9f1c5a0e75b51c83500334477e2ad (patch)
treec5532eb7910384f8fb10bb11f0876f43f1403056
parenta1ced9ab4e9b8026cfc3f3c6c1b7bf10fab969fc (diff)
downloadodr-edilib-master.tar.gz
odr-edilib-master.tar.bz2
odr-edilib-master.zip
Apply some updates from common and update READMEHEADmaster
-rw-r--r--README.md11
-rw-r--r--ReedSolomon.cpp4
-rw-r--r--Socket.cpp126
-rw-r--r--Socket.h44
-rw-r--r--ThreadsafeQueue.h127
-rw-r--r--crc.c1
-rw-r--r--decoder/common.cpp14
-rw-r--r--decoder/common.hpp4
8 files changed, 250 insertions, 81 deletions
diff --git a/README.md b/README.md
index f3d3955..c0fe901 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,12 @@
EDI receiver library
====================
-This project contains an EDI receiver library, to be used
-later in other projects.
+This project contains an EDI receiver library, it's the
+same code as in the other ODR-mmbTools, with a little
+wrapper around it for debugging purposes.
+
+It can convert EDI to ETI-NI, EDI to STI, and can also decode AAC+ inside STI to a wav file.
+
+Source: file, UDP or TCP
+
+See `test/main.cpp` for more.
diff --git a/ReedSolomon.cpp b/ReedSolomon.cpp
index 38d8ea8..1bf0b24 100644
--- a/ReedSolomon.cpp
+++ b/ReedSolomon.cpp
@@ -64,7 +64,9 @@ ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot,
ReedSolomon::~ReedSolomon()
{
- free_rs_char(rsData);
+ if (rsData != nullptr) {
+ free_rs_char(rsData);
+ }
}
diff --git a/Socket.cpp b/Socket.cpp
index cd70a8e..d41ed1c 100644
--- a/Socket.cpp
+++ b/Socket.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -69,7 +69,8 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port
UDPPacket::UDPPacket() { }
UDPPacket::UDPPacket(size_t initSize) :
- buffer(initSize)
+ buffer(initSize),
+ address()
{ }
@@ -198,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)
// This suppresses the -Wlogical-op warning
#if EAGAIN == EWOULDBLOCK
- if (errno == EAGAIN) {
+ if (errno == EAGAIN)
#else
- if (errno == EAGAIN or errno == EWOULDBLOCK) {
+ if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
+ {
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -381,7 +383,7 @@ bool TCPSocket::valid() const
return m_sock != -1;
}
-void TCPSocket::connect(const std::string& hostname, int port)
+void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
{
if (m_sock != INVALID_SOCKET) {
throw std::logic_error("You may only connect an invalid TCPSocket");
@@ -415,10 +417,21 @@ void TCPSocket::connect(const std::string& hostname, int port)
if (sfd == -1)
continue;
+ if (nonblock) {
+ int flags = fcntl(sfd, F_GETFL);
+ if (flags == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not get socket flags: " + errstr);
+ }
+
+ if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+ }
+
int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) {
- // As the TCPClient could set the socket to nonblocking, we
- // must handle EINPROGRESS here
m_sock = sfd;
break;
}
@@ -479,6 +492,11 @@ void TCPSocket::listen(int port, const string& name)
continue;
}
+ int reuse_setting = 1;
+ if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == -1) {
+ throw runtime_error("Can't reuse address");
+ }
+
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
m_sock = sfd;
break;
@@ -612,8 +630,13 @@ ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
{
ssize_t ret = ::recv(m_sock, buffer, length, flags);
if (ret == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive error: " + errstr);
+ if (errno == EINTR) {
+ throw Interrupted();
+ }
+ else {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive error: " + errstr);
+ }
}
return ret;
}
@@ -633,7 +656,7 @@ ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
std::string errstr(strerror(errno));
throw std::runtime_error("TCP receive with poll() error: " + errstr);
}
- else if (retval > 0 and (fds[0].revents | POLLIN)) {
+ else if (retval > 0 and (fds[0].revents & POLLIN)) {
ssize_t ret = ::recv(m_sock, buffer, length, flags);
if (ret == -1) {
if (errno == ECONNREFUSED) {
@@ -673,9 +696,6 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
if (ret == 0) {
m_sock.close();
-
- TCPSocket newsock;
- m_sock = std::move(newsock);
reconnect();
}
@@ -693,13 +713,9 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
void TCPClient::reconnect()
{
- int flags = fcntl(m_sock.m_sock, F_GETFL);
- if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
- }
-
- m_sock.connect(m_hostname, m_port);
+ TCPSocket newsock;
+ m_sock = std::move(newsock);
+ m_sock.connect(m_hostname, m_port, true);
}
TCPConnection::TCPConnection(TCPSocket&& sock) :
@@ -723,7 +739,9 @@ TCPConnection::~TCPConnection()
m_running = false;
vector<uint8_t> termination_marker;
queue.push(termination_marker);
- m_sender_thread.join();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
}
void TCPConnection::process()
@@ -895,4 +913,70 @@ void TCPReceiveServer::process()
}
}
+TCPSendClient::TCPSendClient(const std::string& hostname, int port) :
+ m_hostname(hostname),
+ m_port(port),
+ m_running(true)
+{
+ m_sender_thread = std::thread(&TCPSendClient::process, this);
+}
+
+TCPSendClient::~TCPSendClient()
+{
+ m_running = false;
+ m_queue.trigger_wakeup();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
+}
+
+void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ m_queue.push(buffer);
+
+ if (m_queue.size() > MAX_QUEUE_SIZE) {
+ vector<uint8_t> discard;
+ m_queue.try_pop(discard);
+ }
+}
+
+void TCPSendClient::process()
+{
+ try {
+ while (m_running) {
+ if (m_is_connected) {
+ try {
+ vector<uint8_t> incoming;
+ m_queue.wait_and_pop(incoming);
+ if (m_sock.sendall(incoming.data(), incoming.size()) == -1) {
+ m_is_connected = false;
+ m_sock = TCPSocket();
+ }
+ }
+ catch (const ThreadsafeQueueWakeup&) {
+ break;
+ }
+ }
+ else {
+ try {
+ m_sock.connect(m_hostname, m_port);
+ m_is_connected = true;
+ }
+ catch (const runtime_error& e) {
+ m_is_connected = false;
+ this_thread::sleep_for(chrono::seconds(1));
+ }
+ }
+ }
+ }
+ catch (const runtime_error& e) {
+ m_exception_data = e.what();
+ m_running = false;
+ }
+}
+
}
diff --git a/Socket.h b/Socket.h
index 8bb7fe1..8881be3 100644
--- a/Socket.h
+++ b/Socket.h
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -50,7 +50,7 @@
namespace Socket {
struct InetAddress {
- struct sockaddr_storage addr;
+ struct sockaddr_storage addr = {};
struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
@@ -162,7 +162,7 @@ class TCPSocket {
TCPSocket& operator=(TCPSocket&& other);
bool valid(void) const;
- void connect(const std::string& hostname, int port);
+ void connect(const std::string& hostname, int port, bool nonblock = false);
void listen(int port, const std::string& name);
void close(void);
@@ -180,12 +180,12 @@ class TCPSocket {
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
- /* Returns number of bytes read, 0 on disconnect. Throws a
- * runtime_error on error */
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect.
+ * Throws Interrupted on EINTR, runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags);
class Timeout {};
- class Interrupted {};
/* Returns number of bytes read, 0 on disconnect or refused connection.
* Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error
@@ -258,7 +258,7 @@ class TCPDataDispatcher
size_t m_max_queue_size;
- std::atomic<bool> m_running;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
@@ -285,10 +285,38 @@ class TCPReceiveServer {
size_t m_blocksize = 0;
ThreadsafeQueue<std::vector<uint8_t> > m_queue;
- std::atomic<bool> m_running;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
};
+/* A TCP client that abstracts the handling of connects and disconnects.
+ */
+class TCPSendClient {
+ public:
+ TCPSendClient(const std::string& hostname, int port);
+ ~TCPSendClient();
+
+ /* Throws a runtime_error on error
+ */
+ void sendall(const std::vector<uint8_t>& buffer);
+
+ private:
+ void process();
+
+ std::string m_hostname;
+ int m_port;
+
+ bool m_is_connected = false;
+
+ TCPSocket m_sock;
+ static constexpr size_t MAX_QUEUE_SIZE = 512;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_sender_thread;
+ TCPSocket m_listener_socket;
+};
+
}
diff --git a/ThreadsafeQueue.h b/ThreadsafeQueue.h
index 911fdc8..815dfe0 100644
--- a/ThreadsafeQueue.h
+++ b/ThreadsafeQueue.h
@@ -2,47 +2,49 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
- An implementation for a threadsafe queue
+ An implementation for a threadsafe queue, depends on C++11
When creating a ThreadsafeQueue, one can specify the minimal number
of elements it must contain before it is possible to take one
element out.
*/
/*
- This file is part of ODR-DabMux.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
-#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
+#include <utility>
/* This queue is meant to be used by two threads. One producer
* that pushes elements into the queue, and one consumer that
* retrieves the elements.
*
* The queue can make the consumer block until an element
- * is available.
+ * is available, or a wakeup requested.
*/
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
template<typename T>
class ThreadsafeQueue
{
@@ -50,16 +52,41 @@ public:
/* Push one element into the queue, and notify another thread that
* might be waiting.
*
+ * if max_size > 0 and the queue already contains at least max_size elements,
+ * the element gets discarded.
+ *
* returns the new queue size.
*/
- size_t push(T const& val)
+ size_t push(T const& val, size_t max_size = 0)
{
- size_t queue_size = 0;
- {
- std::lock_guard<std::mutex> lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
+ size_t queue_size_before = the_queue.size();
+ if (max_size == 0) {
the_queue.push(val);
- queue_size = the_queue.size();
}
+ else if (queue_size_before < max_size) {
+ the_queue.push(val);
+ }
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ size_t push(T&& val, size_t max_size = 0)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ size_t queue_size_before = the_queue.size();
+ if (max_size == 0) {
+ the_queue.emplace(std::move(val));
+ }
+ else if (queue_size_before < max_size) {
+ the_queue.emplace(std::move(val));
+ }
+ size_t queue_size = the_queue.size();
+ lock.unlock();
the_rx_notification.notify_one();
@@ -75,21 +102,30 @@ public:
*/
size_t push_wait_if_full(T const& val, size_t threshold)
{
- size_t queue_size = 0;
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() >= threshold) {
- the_tx_notification.wait(lock);
- }
- the_queue.push(val);
- queue_size = the_queue.size();
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
}
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
the_rx_notification.notify_one();
return queue_size;
}
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
/* Send a notification for the receiver thread */
void notify(void)
{
@@ -98,28 +134,27 @@ public:
bool empty() const
{
- std::lock_guard<std::mutex> lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
size_t size() const
{
- std::lock_guard<std::mutex> lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
- {
- std::lock_guard<std::mutex> lock(the_mutex);
- if (the_queue.empty()) {
- return false;
- }
+ std::unique_lock<std::mutex> lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
- popped_value = the_queue.front();
- the_queue.pop();
+ popped_value = the_queue.front();
+ the_queue.pop();
- }
+ lock.unlock();
the_tx_notification.notify_one();
return true;
@@ -127,17 +162,23 @@ public:
void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering) {
- the_rx_notification.wait(lock);
- }
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
+ the_rx_notification.wait(lock);
+ }
- popped_value = the_queue.front();
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
the_queue.pop();
+ lock.unlock();
+ the_tx_notification.notify_one();
}
- the_tx_notification.notify_one();
}
private:
@@ -145,6 +186,6 @@ private:
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
};
-
diff --git a/crc.c b/crc.c
index 0c70263..cc02473 100644
--- a/crc.c
+++ b/crc.c
@@ -248,7 +248,6 @@ uint16_t crc16(uint16_t l_crc, const void *lp_data, unsigned l_nb)
{
const uint8_t* data = (const uint8_t*)lp_data;
while (l_nb--) {
- //fprintf(stdout, "crc 0x%02x 0x%04x\n", *data, l_crc);
l_crc =
(l_crc << 8) ^ crc16tab[(l_crc >> 8) ^ *(data++)];
}
diff --git a/decoder/common.cpp b/decoder/common.cpp
index d8a37e3..306261a 100644
--- a/decoder/common.cpp
+++ b/decoder/common.cpp
@@ -67,7 +67,7 @@ time_t frame_timestamp_t::to_unix_epoch() const
return 946684800 + seconds - utco;
}
-double frame_timestamp_t::diff_ms(const frame_timestamp_t& other) const
+double frame_timestamp_t::diff_s(const frame_timestamp_t& other) const
{
const double lhs = (double)seconds + (tsta / 16384000.0);
const double rhs = (double)other.seconds + (other.tsta / 16384000.0);
@@ -264,9 +264,15 @@ decode_state_t TagDispatcher::decode_afpacket(
}
// SEQ wraps at 0xFFFF, unsigned integer overflow is intentional
- const uint16_t expected_seq = m_last_seq + 1;
- if (expected_seq != seq) {
- etiLog.level(warn) << "EDI AF Packet sequence error, " << seq;
+ if (m_last_seq_valid) {
+ const uint16_t expected_seq = m_last_seq + 1;
+ if (expected_seq != seq) {
+ etiLog.level(warn) << "EDI AF Packet sequence error, " << seq;
+ }
+ }
+ else {
+ etiLog.level(info) << "EDI AF Packet initial sequence number: " << seq;
+ m_last_seq_valid = true;
}
m_last_seq = seq;
diff --git a/decoder/common.hpp b/decoder/common.hpp
index c149421..c8c4bb3 100644
--- a/decoder/common.hpp
+++ b/decoder/common.hpp
@@ -25,6 +25,7 @@
#include <map>
#include <chrono>
#include <string>
+#include <array>
#include <vector>
#include <cstddef>
#include <ctime>
@@ -41,7 +42,7 @@ struct frame_timestamp_t {
std::time_t to_unix_epoch() const;
std::chrono::system_clock::time_point to_system_clock() const;
- double diff_ms(const frame_timestamp_t& other) const;
+ double diff_s(const frame_timestamp_t& other) const;
frame_timestamp_t& operator+=(const std::chrono::milliseconds& ms);
@@ -106,6 +107,7 @@ class TagDispatcher {
bool decode_tagpacket(const std::vector<uint8_t> &payload);
PFT::PFT m_pft;
+ bool m_last_seq_valid = false;
uint16_t m_last_seq = 0;
std::vector<uint8_t> m_input_data;
std::map<std::string, tag_handler> m_handlers;