aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-10-01 12:15:52 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-10-01 12:15:52 +0200
commit15972d6445758f7b36c7b86ecbc5fb7e4ec3742f (patch)
treec14c52a8c11bc26e14831f68bcf1b8a6d4964623
parent9dd8274f4fd39bf0f3544a33e355821c56878608 (diff)
downloadodr-edilib-15972d6445758f7b36c7b86ecbc5fb7e4ec3742f.tar.gz
odr-edilib-15972d6445758f7b36c7b86ecbc5fb7e4ec3742f.tar.bz2
odr-edilib-15972d6445758f7b36c7b86ecbc5fb7e4ec3742f.zip
Add STI decoder
-rw-r--r--CMakeLists.txt6
-rw-r--r--Socket.cpp898
-rw-r--r--Socket.h294
-rw-r--r--ThreadsafeQueue.h (renamed from test/ThreadsafeQueue.h)0
-rw-r--r--decoder/ETIDecoder.cpp2
-rw-r--r--decoder/ETIWriter.cpp33
-rw-r--r--decoder/ETIWriter.hpp10
-rw-r--r--decoder/PFT.cpp2
-rw-r--r--decoder/PFT.hpp5
-rw-r--r--decoder/STIDecoder.cpp6
-rw-r--r--decoder/STIDecoder.hpp2
-rw-r--r--decoder/STIWriter.cpp20
-rw-r--r--decoder/STIWriter.hpp10
-rw-r--r--decoder/buffer_unpack.hpp2
-rw-r--r--decoder/common.cpp48
-rw-r--r--decoder/common.hpp18
-rw-r--r--decoder/eti.hpp4
-rw-r--r--test/AACDecoder.cpp170
-rw-r--r--test/AACDecoder.h52
-rw-r--r--test/UdpSocket.cpp256
-rw-r--r--test/UdpSocket.h176
-rw-r--r--test/main.cpp221
-rw-r--r--test/wavfile.cpp271
-rw-r--r--test/wavfile.h45
24 files changed, 1956 insertions, 595 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6d79c7e..de5ead6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -56,7 +56,9 @@ list(APPEND edilib_cpp_sources
test/main.cpp
test/TimestampDecoder.cpp
test/InetAddress.cpp
- test/UdpSocket.cpp
+ test/wavfile.cpp
+ test/AACDecoder.cpp
+ Socket.cpp
)
list(APPEND edilib_c_sources
@@ -69,7 +71,7 @@ list(APPEND edilib_c_sources
set_source_files_properties(${edilib_cpp_sources} PROPERTIES LANGUAGE "CXX")
set_source_files_properties(${edilib_c_sources} PROPERTIES LANGUAGE "C")
add_executable(edilib ${edilib_cpp_sources} ${edilib_c_sources})
-target_link_libraries(edilib ${FAAD_LIBRARIES} Threads::Threads)
+target_link_libraries(edilib ${FAAD_LIBRARIES} Threads::Threads fdk-aac)
install(TARGETS edilib DESTINATION bin)
diff --git a/Socket.cpp b/Socket.cpp
new file mode 100644
index 0000000..cd70a8e
--- /dev/null
+++ b/Socket.cpp
@@ -0,0 +1,898 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#include "Socket.h"
+
+#include <iostream>
+#include <cstdio>
+#include <cstring>
+#include <cerrno>
+#include <fcntl.h>
+#include <poll.h>
+
+namespace Socket {
+
+using namespace std;
+
+void InetAddress::resolveUdpDestination(const std::string& destination, int port)
+{
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(destination.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ // Take the first result
+ memcpy(&addr, rp->ai_addr, rp->ai_addrlen);
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not resolve");
+ }
+}
+
+UDPPacket::UDPPacket() { }
+
+UDPPacket::UDPPacket(size_t initSize) :
+ buffer(initSize)
+{ }
+
+
+UDPSocket::UDPSocket() :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(0, "");
+}
+
+UDPSocket::UDPSocket(int port) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, "");
+}
+
+UDPSocket::UDPSocket(int port, const std::string& name) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, name);
+}
+
+
+void UDPSocket::setBlocking(bool block)
+{
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
+ if (res == -1) {
+ throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
+void UDPSocket::reinit(int port, const std::string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0; /* Any protocol */
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(),
+ port == 0 ? nullptr : service,
+ &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void UDPSocket::close()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ m_sock = INVALID_SOCKET;
+}
+
+UDPSocket::~UDPSocket()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+}
+
+
+UDPPacket UDPSocket::receive(size_t max_size)
+{
+ UDPPacket packet(max_size);
+ socklen_t addrSize;
+ addrSize = sizeof(*packet.address.as_sockaddr());
+ ssize_t ret = recvfrom(m_sock,
+ packet.buffer.data(),
+ packet.buffer.size(),
+ 0,
+ packet.address.as_sockaddr(),
+ &addrSize);
+
+ if (ret == SOCKET_ERROR) {
+ packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
+ if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
+ return 0;
+ }
+ throw runtime_error(string("Can't receive data: ") + strerror(errno));
+ }
+
+ packet.buffer.resize(ret);
+ return packet;
+}
+
+void UDPSocket::send(UDPPacket& packet)
+{
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
+ packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+
+void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
+{
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
+ destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+{
+ ip_mreqn group;
+ if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
+ throw runtime_error("Cannot convert multicast group name");
+ }
+ if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
+ throw runtime_error("Group name is not a multicast address");
+ }
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
+ group.imr_ifindex = 0;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastSource(const char* source_addr)
+{
+ struct in_addr addr;
+ if (inet_aton(source_addr, &addr) == 0) {
+ throw runtime_error(string("Can't parse source address") + strerror(errno));
+ }
+
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set source address") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastTTL(int ttl)
+{
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
+ }
+}
+
+UDPReceiver::UDPReceiver() { }
+
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ }
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
+}
+
+TCPSocket::~TCPSocket()
+{
+ if (m_sock != -1) {
+ ::close(m_sock);
+ }
+}
+
+TCPSocket::TCPSocket(TCPSocket&& other) :
+ m_sock(other.m_sock),
+ m_remote_address(move(other.m_remote_address))
+{
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+}
+
+TCPSocket& TCPSocket::operator=(TCPSocket&& other)
+{
+ swap(m_remote_address, other.m_remote_address);
+
+ m_sock = other.m_sock;
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+
+ return *this;
+}
+
+bool TCPSocket::valid() const
+{
+ return m_sock != -1;
+}
+
+void TCPSocket::connect(const std::string& hostname, int port)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only connect an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ /* Obtain address(es) matching host/port */
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(hostname.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully connect(2).
+ If socket(2) (or connect(2)) fails, we (close the socket
+ and) try the next address. */
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype,
+ rp->ai_protocol);
+ if (sfd == -1)
+ continue;
+
+ int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
+ if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) {
+ // As the TCPClient could set the socket to nonblocking, we
+ // must handle EINPROGRESS here
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
+ == SOCKET_ERROR) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+ }
+
+ freeaddrinfo(result); /* No longer needed */
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not connect");
+ }
+
+}
+
+void TCPSocket::listen(int port, const string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only listen with an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0;
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
+ int ret = ::listen(m_sock, 0);
+ if (ret == -1) {
+ throw std::runtime_error(string("Could not listen: ") + strerror(errno));
+ }
+ }
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void TCPSocket::close()
+{
+ ::close(m_sock);
+ m_sock = -1;
+}
+
+TCPSocket TCPSocket::accept(int timeout_ms)
+{
+ if (timeout_ms == 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP Socket accept error: " + errstr);
+ }
+ else if (retval > 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ TCPSocket s(-1);
+ return s;
+ }
+ }
+}
+
+ssize_t TCPSocket::sendall(const void *buffer, size_t buflen)
+{
+ uint8_t *buf = (uint8_t*)buffer;
+ while (buflen > 0) {
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process
+ * would not receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the
+ * same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ ssize_t sent = ::send(m_sock, buf, buflen, flags);
+ if (sent < 0) {
+ return -1;
+ }
+ else {
+ buf += sent;
+ buflen -= sent;
+ }
+ }
+ return buflen;
+}
+
+ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms)
+{
+ if (timeout_ms) {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLOUT;
+
+ const int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno));
+ }
+ else if (retval == 0) {
+ // Timed out
+ return 0;
+ }
+ }
+
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
+ * receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
+
+ if (ret == SOCKET_ERROR) {
+ throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno));
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
+{
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive error: " + errstr);
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1 and errno == EINTR) {
+ throw Interrupted();
+ }
+ else if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive with poll() error: " + errstr);
+ }
+ else if (retval > 0 and (fds[0].revents | POLLIN)) {
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ if (errno == ECONNREFUSED) {
+ return 0;
+ }
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive after poll() error: " + errstr);
+ }
+ return ret;
+ }
+ else {
+ throw Timeout();
+ }
+}
+
+TCPSocket::TCPSocket(int sockfd) :
+ m_sock(sockfd),
+ m_remote_address()
+{ }
+
+TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) :
+ m_sock(sockfd),
+ m_remote_address(remote_address)
+{ }
+
+void TCPClient::connect(const std::string& hostname, int port)
+{
+ m_hostname = hostname;
+ m_port = port;
+ reconnect();
+}
+
+ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ try {
+ ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms);
+
+ if (ret == 0) {
+ m_sock.close();
+
+ TCPSocket newsock;
+ m_sock = std::move(newsock);
+ reconnect();
+ }
+
+ return ret;
+ }
+ catch (const TCPSocket::Interrupted&) {
+ return -1;
+ }
+ catch (const TCPSocket::Timeout&) {
+ return 0;
+ }
+
+ return 0;
+}
+
+void TCPClient::reconnect()
+{
+ int flags = fcntl(m_sock.m_sock, F_GETFL);
+ if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+
+ m_sock.connect(m_hostname, m_port);
+}
+
+TCPConnection::TCPConnection(TCPSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock))
+{
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+ m_sender_thread = std::thread(&TCPConnection::process, this);
+}
+
+TCPConnection::~TCPConnection()
+{
+ m_running = false;
+ vector<uint8_t> termination_marker;
+ queue.push(termination_marker);
+ m_sender_thread.join();
+}
+
+void TCPConnection::process()
+{
+ while (m_running) {
+ vector<uint8_t> data;
+ queue.wait_and_pop(data);
+
+ if (data.empty()) {
+ // empty vector is the termination marker
+ m_running = false;
+ break;
+ }
+
+ try {
+ ssize_t remaining = data.size();
+ const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
+ const int timeout_ms = 10; // Less than one ETI frame
+
+ while (m_running and remaining > 0) {
+ const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
+ if (sent < 0 or sent > remaining) {
+ throw std::logic_error("Invalid TCPSocket::send() return value");
+ }
+ remaining -= sent;
+ buf += sent;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "Dropping TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+}
+
+
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
+ m_max_queue_size(max_queue_size)
+{
+}
+
+TCPDataDispatcher::~TCPDataDispatcher()
+{
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+void TCPDataDispatcher::start(int port, const string& address)
+{
+ m_listener_socket.listen(port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
+}
+
+void TCPDataDispatcher::write(const vector<uint8_t>& data)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if(
+ [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+}
+
+void TCPDataDispatcher::process()
+{
+ try {
+ const int timeout_ms = 1000;
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ auto sock = m_listener_socket.accept(timeout_ms);
+ if (sock.valid()) {
+ m_connections.emplace(m_connections.begin(), move(sock));
+ }
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_exception_data = string("TCPDataDispatcher error: ") + e.what();
+ m_running = false;
+ }
+}
+
+TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
+ m_blocksize(blocksize)
+{
+}
+
+void TCPReceiveServer::start(int listen_port, const std::string& address)
+{
+ m_listener_socket.listen(listen_port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPReceiveServer::process, this);
+}
+
+TCPReceiveServer::~TCPReceiveServer()
+{
+ m_running = false;
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+vector<uint8_t> TCPReceiveServer::receive()
+{
+ vector<uint8_t> buffer;
+ m_queue.try_pop(buffer);
+
+ // we can ignore try_pop()'s return value, because
+ // if it is unsuccessful the buffer is not touched.
+ return buffer;
+}
+
+void TCPReceiveServer::process()
+{
+ constexpr int timeout_ms = 1000;
+ constexpr int disconnect_timeout_ms = 10000;
+ constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms;
+
+ while (m_running) {
+ auto sock = m_listener_socket.accept(timeout_ms);
+
+ int num_timeouts = 0;
+
+ while (m_running and sock.valid()) {
+ try {
+ vector<uint8_t> buf(m_blocksize);
+ ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms);
+ if (r < 0) {
+ throw logic_error("Invalid recv return value");
+ }
+ else if (r == 0) {
+ sock.close();
+ break;
+ }
+ else {
+ buf.resize(r);
+ m_queue.push(move(buf));
+ }
+ }
+ catch (const TCPSocket::Interrupted&) {
+ break;
+ }
+ catch (const TCPSocket::Timeout&) {
+ num_timeouts++;
+ }
+
+ if (num_timeouts > max_num_timeouts) {
+ sock.close();
+ }
+ }
+ }
+}
+
+}
diff --git a/Socket.h b/Socket.h
new file mode 100644
index 0000000..8bb7fe1
--- /dev/null
+++ b/Socket.h
@@ -0,0 +1,294 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "ThreadsafeQueue.h"
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include <atomic>
+#include <thread>
+#include <list>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#define SOCKET int
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
+
+
+namespace Socket {
+
+struct InetAddress {
+ struct sockaddr_storage addr;
+
+ struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
+
+ void resolveUdpDestination(const std::string& destination, int port);
+};
+
+/** This class represents a UDP packet.
+ *
+ * A UDP packet contains a payload (sequence of bytes) and an address. For
+ * outgoing packets, the address is the destination address. For incoming
+ * packets, the address tells the user from what source the packet arrived from.
+ */
+class UDPPacket
+{
+ public:
+ UDPPacket();
+ UDPPacket(size_t initSize);
+
+ std::vector<uint8_t> buffer;
+ InetAddress address;
+};
+
+/**
+ * This class represents a socket for sending and receiving UDP packets.
+ *
+ * A UDP socket is the sending or receiving point for a packet delivery service.
+ * Each packet sent or received on a datagram socket is individually
+ * addressed and routed. Multiple packets sent from one machine to another may
+ * be routed differently, and may arrive in any order.
+ */
+class UDPSocket
+{
+ public:
+ /** Create a new socket that will not be bound to any port. To be used
+ * for data output.
+ */
+ UDPSocket();
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ */
+ UDPSocket(int port);
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ * @param name The IP address on which the socket will be bound.
+ * It is used to bind the socket on a specific interface if
+ * the computer have many NICs.
+ */
+ UDPSocket(int port, const std::string& name);
+ ~UDPSocket();
+ UDPSocket(const UDPSocket& other) = delete;
+ const UDPSocket& operator=(const UDPSocket& other) = delete;
+
+ /** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
+ void reinit(int port, const std::string& name);
+
+ void close(void);
+ void send(UDPPacket& packet);
+ void send(const std::vector<uint8_t>& data, InetAddress destination);
+ UDPPacket receive(size_t max_size);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
+ void setMulticastSource(const char* source_addr);
+ void setMulticastTTL(int ttl);
+
+ /** Set blocking mode. By default, the socket is blocking.
+ * throws a runtime_error on error.
+ */
+ void setBlocking(bool block);
+
+ protected:
+ SOCKET m_sock;
+};
+
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver();
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port = 0;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued = 1;
+ std::thread m_thread;
+ std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
+
+class TCPSocket {
+ public:
+ TCPSocket();
+ ~TCPSocket();
+ TCPSocket(const TCPSocket& other) = delete;
+ TCPSocket& operator=(const TCPSocket& other) = delete;
+ TCPSocket(TCPSocket&& other);
+ TCPSocket& operator=(TCPSocket&& other);
+
+ bool valid(void) const;
+ void connect(const std::string& hostname, int port);
+ void listen(int port, const std::string& name);
+ void close(void);
+
+ /* throws a runtime_error on failure, an invalid socket on timeout */
+ TCPSocket accept(int timeout_ms);
+
+ /* returns -1 on error, doesn't work on nonblocking sockets */
+ ssize_t sendall(const void *buffer, size_t buflen);
+
+ /** Send data over the TCP connection.
+ * @param data The buffer that will be sent.
+ * @param size Number of bytes to send.
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
+ */
+ ssize_t send(const void* data, size_t size, int timeout_ms=0);
+
+ /* Returns number of bytes read, 0 on disconnect. Throws a
+ * runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags);
+
+ class Timeout {};
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect or refused connection.
+ * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error
+ */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ explicit TCPSocket(int sockfd);
+ explicit TCPSocket(int sockfd, InetAddress remote_address);
+ SOCKET m_sock = -1;
+
+ InetAddress m_remote_address;
+
+ friend class TCPClient;
+};
+
+/* Implements a TCP receiver that auto-reconnects on errors */
+class TCPClient {
+ public:
+ void connect(const std::string& hostname, int port);
+
+ /* Returns numer of bytes read, 0 on auto-reconnect, -1
+ * on interruption.
+ * Throws a runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ void reconnect(void);
+ TCPSocket m_sock;
+ std::string m_hostname;
+ int m_port;
+};
+
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TCPSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TCPSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process();
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
+ */
+class TCPReceiveServer {
+ public:
+ TCPReceiveServer(size_t blocksize);
+ ~TCPReceiveServer();
+ TCPReceiveServer(const TCPReceiveServer&) = delete;
+ TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
+
+ void start(int listen_port, const std::string& address);
+
+ // Return a vector that contains up to blocksize bytes of data, or
+ // and empty vector if no data is available.
+ std::vector<uint8_t> receive();
+
+ private:
+ void process();
+
+ size_t m_blocksize = 0;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+};
+
+}
diff --git a/test/ThreadsafeQueue.h b/ThreadsafeQueue.h
index 911fdc8..911fdc8 100644
--- a/test/ThreadsafeQueue.h
+++ b/ThreadsafeQueue.h
diff --git a/decoder/ETIDecoder.cpp b/decoder/ETIDecoder.cpp
index a1b801b..1fa9c3c 100644
--- a/decoder/ETIDecoder.cpp
+++ b/decoder/ETIDecoder.cpp
@@ -22,7 +22,7 @@
#include "buffer_unpack.hpp"
#include "crc.h"
#include "Log.h"
-#include <stdio.h>
+#include <cstdio>
#include <cassert>
#include <sstream>
diff --git a/decoder/ETIWriter.cpp b/decoder/ETIWriter.cpp
index e51eccd..aabd191 100644
--- a/decoder/ETIWriter.cpp
+++ b/decoder/ETIWriter.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2017
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -34,23 +34,6 @@ namespace EdiDecoder {
using namespace std;
-string eti_frame_t::calculate_timestamp() const
-{
- const time_t seconds_in_unix_epoch = timestamp_in_unix_epoch();
-
- stringstream ss;
- ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z");
- return ss.str();
-}
-
-time_t eti_frame_t::timestamp_in_unix_epoch() const
-{
- // EDI epoch: 2000-01-01T00:00:00Z
- // Convert using
- // TZ=UTC python -c 'import datetime; print(datetime.datetime(2000,1,1,0,0,0,0).strftime("%s"))'
- return 946684800 + seconds - utco;
-}
-
void ETIWriter::update_protocol(
const std::string& proto,
uint16_t major,
@@ -68,7 +51,7 @@ void ETIWriter::reinit()
m_proto_valid = false;
m_fc_valid = false;
m_fic.clear();
- m_etiFrame.clear();
+ m_etiFrame.frame.clear();
m_subchannels.clear();
}
@@ -301,20 +284,20 @@ void ETIWriter::assemble()
eti.resize(6144, 0x55);
- m_etiFrame = eti;
-
+ m_etiFrame.frame = move(eti);
+ m_etiFrame.timestamp.seconds = m_seconds;
+ m_etiFrame.timestamp.utco = m_utco;
}
eti_frame_t ETIWriter::getEtiFrame()
{
- if (m_etiFrame.empty()) {
+ if (m_etiFrame.frame.empty()) {
return {};
}
eti_frame_t eti;
- eti.frame = move(m_etiFrame);
- eti.seconds = m_seconds;
- eti.utco = m_utco;
+ swap(eti, m_etiFrame);
+
reinit();
return eti;
diff --git a/decoder/ETIWriter.hpp b/decoder/ETIWriter.hpp
index 0b1dc0e..4b2acd7 100644
--- a/decoder/ETIWriter.hpp
+++ b/decoder/ETIWriter.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2016
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -31,11 +31,7 @@ namespace EdiDecoder {
struct eti_frame_t {
std::vector<uint8_t> frame;
- uint32_t seconds = 0;
- uint32_t utco = 0;
-
- std::string calculate_timestamp() const;
- time_t timestamp_in_unix_epoch() const;
+ frame_timestamp_t timestamp;
};
class ETIWriter : public ETIDataCollector {
@@ -91,7 +87,7 @@ class ETIWriter : public ETIDataCollector {
// m_fic is valid if non-empty
std::vector<uint8_t> m_fic;
- std::vector<uint8_t> m_etiFrame;
+ eti_frame_t m_etiFrame;
std::list<eti_stc_data> m_subchannels;
diff --git a/decoder/PFT.cpp b/decoder/PFT.cpp
index aff7929..158b206 100644
--- a/decoder/PFT.cpp
+++ b/decoder/PFT.cpp
@@ -20,7 +20,7 @@
* -------------------------------------------------------------------
*/
-#include <stdio.h>
+#include <cstdio>
#include <cassert>
#include <cstring>
#include <sstream>
diff --git a/decoder/PFT.hpp b/decoder/PFT.hpp
index 779509b..208fd70 100644
--- a/decoder/PFT.hpp
+++ b/decoder/PFT.hpp
@@ -21,10 +21,11 @@
*/
#pragma once
-#include <stdio.h>
+#include <cstdio>
+#include <cstdint>
#include <vector>
#include <map>
-#include <stdint.h>
+#include <string>
namespace EdiDecoder {
namespace PFT {
diff --git a/decoder/STIDecoder.cpp b/decoder/STIDecoder.cpp
index ca8cead..2f196df 100644
--- a/decoder/STIDecoder.cpp
+++ b/decoder/STIDecoder.cpp
@@ -22,7 +22,7 @@
#include "buffer_unpack.hpp"
#include "crc.h"
#include "Log.h"
-#include <stdio.h>
+#include <cstdio>
#include <cassert>
#include <sstream>
@@ -75,6 +75,8 @@ bool STIDecoder::decode_starptr(const vector<uint8_t> &value, uint16_t)
copy(value.begin(), value.begin() + 4, protocol_sz);
string protocol(protocol_sz);
+ etiLog.level(debug) << "STI *PTR " << protocol;
+
uint16_t major = read_16b(value.begin() + 4);
uint16_t minor = read_16b(value.begin() + 6);
@@ -98,6 +100,7 @@ bool STIDecoder::decode_dsti(const vector<uint8_t> &value, uint16_t)
uint8_t dfcth = (dstiHeader >> 8) & 0x1F;
uint8_t dfctl = dstiHeader & 0xFF;
+ etiLog.log(debug, "STI DSTI DFLC=%d * 250 + %d", dfcth, dfctl);
md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter
const size_t expected_length = 2 +
@@ -157,6 +160,7 @@ bool STIDecoder::decode_ssn(const vector<uint8_t> &value, uint16_t n)
sti_payload_data sti;
sti.stream_index = n - 1; // n is 1-indexed
+ etiLog.log(debug, "STI SSn=%d", n);
sti.rfa = value[0] >> 3;
sti.tid = value[0] & 0x07;
diff --git a/decoder/STIDecoder.hpp b/decoder/STIDecoder.hpp
index 3376a01..3f6f729 100644
--- a/decoder/STIDecoder.hpp
+++ b/decoder/STIDecoder.hpp
@@ -20,12 +20,12 @@
*/
#pragma once
-#include "eti.hpp"
#include "common.hpp"
#include <cstdint>
#include <deque>
#include <string>
#include <vector>
+#include <array>
namespace EdiDecoder {
diff --git a/decoder/STIWriter.cpp b/decoder/STIWriter.cpp
index 5083c64..8f527b0 100644
--- a/decoder/STIWriter.cpp
+++ b/decoder/STIWriter.cpp
@@ -53,6 +53,7 @@ void STIWriter::reinit()
m_stat_valid = false;
m_time_valid = false;
m_payload_valid = false;
+ m_stiFrame.frame.clear();
}
void STIWriter::update_stat(uint8_t stat, uint16_t spid)
@@ -96,12 +97,13 @@ void STIWriter::update_edi_time(
// TODO check validity
m_time_valid = true;
-
}
void STIWriter::assemble()
{
+ fprintf(stderr, "STIWriter::assemble DFLC=%d\n", m_management_data.dflc);
+
if (not m_proto_valid) {
throw std::logic_error("Cannot assemble STI before protocol");
}
@@ -115,15 +117,25 @@ void STIWriter::assemble()
}
// TODO check time validity
+
+ // Do copies so as to preserve existing payload data
+ m_stiFrame.frame = m_payload.istd;
+ m_stiFrame.timestamp.seconds = m_seconds;
+ m_stiFrame.timestamp.utco = m_utco;
+ m_stiFrame.timestamp.tsta = m_management_data.tsta;
}
-std::vector<uint8_t> STIWriter::getFrame()
+sti_frame_t STIWriter::getFrame()
{
- if (not m_payload_valid) {
+ if (m_stiFrame.frame.empty()) {
return {};
}
+ fprintf(stderr, "STIWriter::getframe DFLC=%d\n", m_management_data.dflc);
- return m_payload.istd;
+ sti_frame_t sti;
+ swap(sti, m_stiFrame);
+ reinit();
+ return sti;
}
}
diff --git a/decoder/STIWriter.hpp b/decoder/STIWriter.hpp
index 579009f..a75cb69 100644
--- a/decoder/STIWriter.hpp
+++ b/decoder/STIWriter.hpp
@@ -20,6 +20,7 @@
*/
#pragma once
+#include "common.hpp"
#include "STIDecoder.hpp"
#include <cstdint>
#include <string>
@@ -28,6 +29,11 @@
namespace EdiDecoder {
+struct sti_frame_t {
+ std::vector<uint8_t> frame;
+ frame_timestamp_t timestamp;
+};
+
class STIWriter : public STIDataCollector {
public:
// Tell the ETIWriter what EDI protocol we receive in *ptr.
@@ -50,7 +56,7 @@ class STIWriter : public STIDataCollector {
virtual void assemble(void);
// Return the assembled frame or an empty frame if not ready
- std::vector<uint8_t> getFrame();
+ sti_frame_t getFrame();
private:
void reinit(void);
@@ -70,6 +76,8 @@ class STIWriter : public STIDataCollector {
bool m_payload_valid = false;
sti_payload_data m_payload;
+
+ sti_frame_t m_stiFrame;
};
}
diff --git a/decoder/buffer_unpack.hpp b/decoder/buffer_unpack.hpp
index 05a1534..a996017 100644
--- a/decoder/buffer_unpack.hpp
+++ b/decoder/buffer_unpack.hpp
@@ -20,7 +20,7 @@
*/
#pragma once
-#include <stdint.h>
+#include <cstdint>
namespace EdiDecoder {
diff --git a/decoder/common.cpp b/decoder/common.cpp
index 25f12b0..cb1f425 100644
--- a/decoder/common.cpp
+++ b/decoder/common.cpp
@@ -22,14 +22,56 @@
#include "buffer_unpack.hpp"
#include "Log.h"
#include "crc.h"
-#include <cstdio>
-#include <cassert>
+#include <iomanip>
#include <sstream>
+#include <cassert>
+#include <cmath>
+#include <cstdio>
namespace EdiDecoder {
using namespace std;
+bool frame_timestamp_t::valid() const
+{
+ return tsta != 0xFFFFFF;
+}
+
+string frame_timestamp_t::to_string() const
+{
+ const time_t seconds_in_unix_epoch = to_unix_epoch();
+
+ stringstream ss;
+ if (valid()) {
+ ss << "Timestamp: ";
+ }
+ else {
+ ss << "Timestamp not valid: ";
+ }
+ ss << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z") <<
+ " + " << ((double)tsta / 16384000.0);
+ return ss.str();
+}
+
+time_t frame_timestamp_t::to_unix_epoch() const
+{
+ // EDI epoch: 2000-01-01T00:00:00Z
+ // Convert using
+ // TZ=UTC python -c 'import datetime; print(datetime.datetime(2000,1,1,0,0,0,0).strftime("%s"))'
+ return 946684800 + seconds - utco;
+}
+
+std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const
+{
+ auto ts = chrono::system_clock::from_time_t(to_unix_epoch());
+
+ // PPS offset in seconds = tsta / 16384000
+ ts += chrono::nanoseconds(std::lrint(tsta / 0.016384));
+
+ return ts;
+}
+
+
TagDispatcher::TagDispatcher(
std::function<void()>&& af_packet_completed, bool verbose) :
m_af_packet_completed(move(af_packet_completed))
@@ -160,7 +202,7 @@ decode_state_t TagDispatcher::decode_afpacket(
return {false, 0};
}
- if (m_last_seq + 1 != seq) {
+ if (m_last_seq + (uint16_t)1 != seq) {
etiLog.level(warn) << "EDI AF Packet sequence error, " << seq;
}
m_last_seq = seq;
diff --git a/decoder/common.hpp b/decoder/common.hpp
index b5df890..887bc3d 100644
--- a/decoder/common.hpp
+++ b/decoder/common.hpp
@@ -21,13 +21,27 @@
#pragma once
#include "PFT.hpp"
-#include <vector>
-#include <map>
#include <functional>
+#include <map>
+#include <chrono>
+#include <string>
+#include <vector>
#include <cstddef>
+#include <ctime>
namespace EdiDecoder {
+struct frame_timestamp_t {
+ uint32_t seconds = 0;
+ uint32_t utco = 0;
+ uint32_t tsta = 0; // According to EN 300 797 Annex B
+
+ bool valid() const;
+ std::string to_string() const;
+ time_t to_unix_epoch() const;
+ std::chrono::system_clock::time_point to_system_clock() const;
+};
+
struct decode_state_t {
decode_state_t(bool _complete, size_t _num_bytes_consumed) :
complete(_complete), num_bytes_consumed(_num_bytes_consumed) {}
diff --git a/decoder/eti.hpp b/decoder/eti.hpp
index 451ca48..372f098 100644
--- a/decoder/eti.hpp
+++ b/decoder/eti.hpp
@@ -24,10 +24,10 @@
#pragma once
-#include <stdint.h>
+#include <cstdint>
#define PACKED __attribute__ ((packed))
-#include <time.h>
+#include <ctime>
namespace EdiDecoder {
diff --git a/test/AACDecoder.cpp b/test/AACDecoder.cpp
new file mode 100644
index 0000000..83d7b26
--- /dev/null
+++ b/test/AACDecoder.cpp
@@ -0,0 +1,170 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2011 Martin Storsjo
+ * Copyright (C) 2017 Matthias P. Braendli
+ * Copyright (C) 2016 Stefan Pöschel
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#include "config.h"
+#include "AACDecoder.h"
+#include <stdexcept>
+#include <string>
+
+AACDecoder::AACDecoder(const char* wavfilename) :
+ m_wav_writer(wavfilename)
+{
+ m_handle = aacDecoder_Open(TT_MP4_RAW, 1);
+ if (not m_handle) {
+ throw std::runtime_error("AACDecoder: error opening decoder");
+ }
+}
+
+AACDecoder::~AACDecoder()
+{
+ if (m_handle) {
+ aacDecoder_Close(m_handle);
+ }
+}
+
+void AACDecoder::decode_frame(uint8_t *data, size_t len)
+{
+ const bool dac_rate = data[2] & 0x40;
+ const bool sbr_flag = data[2] & 0x20;
+ const bool aac_channel_mode = data[2] & 0x10;
+ const bool ps_flag = data[2] & 0x08;
+ const uint8_t mpeg_surround_config = data[2] & 0x07;
+
+ const int core_sr_index = dac_rate ?
+ (sbr_flag ? 6 : 3) : (sbr_flag ? 8 : 5); // 24/48/16/32 kHz
+ const int core_ch_config = aac_channel_mode ? 2 : 1;
+ const int extension_sr_index = dac_rate ? 3 : 5; // 48/32 kHz
+
+ int au_start[6] = {};
+
+ int num_aus = dac_rate ? (sbr_flag ? 3 : 6) : (sbr_flag ? 2 : 4);
+ au_start[0] = dac_rate ? (sbr_flag ? 6 : 11) : (sbr_flag ? 5 : 8);
+ au_start[1] = data[3] << 4 | data[4] >> 4;
+
+ fprintf(stderr, "AAC: %d AUs, sbr %d\n", num_aus, sbr_flag);
+
+ if (num_aus >= 3) {
+ au_start[2] = (data[4] & 0x0F) << 8 | data[5];
+ }
+
+ if (num_aus >= 4) {
+ au_start[3] = data[6] << 4 | data[7] >> 4;
+ }
+
+ if (num_aus == 6) {
+ au_start[4] = (data[7] & 0x0F) << 8 | data[8];
+ au_start[5] = data[9] << 4 | data[10] >> 4;
+ }
+
+ au_start[num_aus] = len; // end of the buffer
+
+ for (int i = 0; i < num_aus; i++) {
+ if (au_start[i] >= au_start[i+1]) {
+ throw std::runtime_error(" AU ordering check failed\n");
+ }
+ }
+
+ if (not m_decoder_set_up) {
+ std::vector<uint8_t> asc;
+
+ // AAC LC
+ asc.push_back(0b00010 << 3 | core_sr_index >> 1);
+ asc.push_back((core_sr_index & 0x01) << 7 | core_ch_config << 3 | 0b100);
+
+ if (sbr_flag) {
+ // add SBR
+ asc.push_back(0x56);
+ asc.push_back(0xE5);
+ asc.push_back(0x80 | (extension_sr_index << 3));
+
+ if (ps_flag) {
+ // add PS
+ asc.back() |= 0x05;
+ asc.push_back(0x48);
+ asc.push_back(0x80);
+ }
+ }
+
+ uint8_t* asc_array[1] {asc.data()};
+ const unsigned int asc_sizeof_array[1] {(unsigned int) asc.size()};
+
+ AAC_DECODER_ERROR init_result = aacDecoder_ConfigRaw(m_handle,
+ asc_array, asc_sizeof_array);
+ if (init_result != AAC_DEC_OK) {
+ throw std::runtime_error(
+ "AACDecoderFDKAAC: error while aacDecoder_ConfigRaw: " +
+ std::to_string(init_result));
+ }
+
+ m_channels = (aac_channel_mode or ps_flag) ? 2 : 1;
+ size_t output_frame_len = 960 * 2 * m_channels * (sbr_flag ? 2 : 1);
+ m_output_frame.resize(output_frame_len);
+ fprintf(stderr, " Setting decoder output frame len %zu\n", output_frame_len);
+
+ const int sample_rate = dac_rate ? 48000 : 32000;
+ m_wav_writer.initialise_header(sample_rate, m_channels);
+ m_decoder_set_up = true;
+
+ fprintf(stderr, " Set up decoder with %d Hz, %s%swith %d channels\n",
+ sample_rate, (sbr_flag ? "SBR " : ""), (ps_flag ? "PS " : ""),
+ m_channels);
+
+ }
+
+ const size_t AU_CRCLEN = 2;
+ for (int i = 0; i < num_aus; i++) {
+ uint8_t *au_data = data + au_start[i];
+ size_t au_len = au_start[i+1] - au_start[i] - AU_CRCLEN;
+ decode_au(au_data, au_len);
+ }
+}
+
+void AACDecoder::decode_au(uint8_t *data, size_t len)
+{
+ uint8_t* input_buffer[1] {data};
+ const unsigned int input_buffer_size[1] {(unsigned int) len};
+ unsigned int bytes_valid = len;
+
+ // fill internal input buffer
+ AAC_DECODER_ERROR result = aacDecoder_Fill(
+ m_handle, input_buffer, input_buffer_size, &bytes_valid);
+
+ if (result != AAC_DEC_OK) {
+ throw std::runtime_error(
+ "AACDecoderFDKAAC: error while aacDecoder_Fill: " +
+ std::to_string(result));
+ }
+
+ if (bytes_valid) {
+ throw std::runtime_error(
+ "AACDecoderFDKAAC: aacDecoder_Fill did not consume all bytes");
+ }
+
+ // decode audio
+ result = aacDecoder_DecodeFrame(m_handle,
+ (short int*)m_output_frame.data(), m_output_frame.size(), 0);
+ if (result != AAC_DEC_OK) {
+ throw std::runtime_error(
+ "AACDecoderFDKAAC: error while aacDecoder_DecodeFrame: " +
+ std::to_string(result));
+ }
+
+ m_wav_writer.write_data(m_output_frame.data(), m_output_frame.size());
+}
diff --git a/test/AACDecoder.h b/test/AACDecoder.h
new file mode 100644
index 0000000..e9c179d
--- /dev/null
+++ b/test/AACDecoder.h
@@ -0,0 +1,52 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2011 Martin Storsjo
+ * Copyright (C) 2017 Matthias P. Braendli
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+/*!
+ * \file AACDecoder.h
+ * \brief Uses FDK-AAC to decode the AAC format for loopback tests
+ */
+
+#pragma once
+
+#include <fdk-aac/aacdecoder_lib.h>
+#include <cstdint>
+#include <vector>
+#include "wavfile.h"
+
+class AACDecoder {
+ public:
+ AACDecoder(const char* wavfilename);
+ ~AACDecoder();
+ AACDecoder(const AACDecoder&) = delete;
+ const AACDecoder& operator=(const AACDecoder&) = delete;
+
+ void decode_frame(uint8_t *data, size_t len);
+
+ private:
+ void decode_au(uint8_t *data, size_t len);
+
+ bool m_decoder_set_up = false;
+
+ int m_channels = 0;
+
+ WavWriter m_wav_writer;
+ HANDLE_AACDECODER m_handle;
+ std::vector<uint8_t> m_output_frame;
+};
+
diff --git a/test/UdpSocket.cpp b/test/UdpSocket.cpp
deleted file mode 100644
index ccdd7ed..0000000
--- a/test/UdpSocket.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "UdpSocket.h"
-
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-
-using namespace std;
-
-UdpSocket::UdpSocket() :
- listenSocket(INVALID_SOCKET)
-{
- reinit(0, "");
-}
-
-UdpSocket::UdpSocket(int port) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, "");
-}
-
-UdpSocket::UdpSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, name);
-}
-
-
-int UdpSocket::setBlocking(bool block)
-{
- int res;
- if (block)
- res = fcntl(listenSocket, F_SETFL, 0);
- else
- res = fcntl(listenSocket, F_SETFL, O_NONBLOCK);
- if (res == SOCKET_ERROR) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-}
-
-int UdpSocket::reinit(int port, const std::string& name)
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
- setInetError("Can't create socket");
- return -1;
- }
- reuseopt_t reuse = 1;
- if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- setInetError("Can't reuse address");
- return -1;
- }
-
- if (port) {
- address.setAddress(name);
- address.setPort(port);
-
- if (bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- setInetError("Can't bind socket");
- ::close(listenSocket);
- listenSocket = INVALID_SOCKET;
- return -1;
- }
- }
- return 0;
-}
-
-int UdpSocket::close()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- listenSocket = INVALID_SOCKET;
-
- return 0;
-}
-
-UdpSocket::~UdpSocket()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-}
-
-
-int UdpSocket::receive(UdpPacket& packet)
-{
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- ssize_t ret = recvfrom(listenSocket,
- packet.getData(),
- packet.getSize(),
- 0,
- packet.getAddress().getAddress(),
- &addrSize);
-
- if (ret == SOCKET_ERROR) {
- packet.setSize(0);
- if (errno == EAGAIN) {
- return 0;
- }
- setInetError("Can't receive UDP packet");
- return -1;
- }
-
- packet.setSize(ret);
- return 0;
-}
-
-int UdpSocket::send(UdpPacket& packet)
-{
- int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0,
- packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
-{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
- destination.getAddress(), sizeof(*destination.getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::joinGroup(char* groupname)
-{
- ip_mreqn group;
- if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
- setInetError(groupname);
- return -1;
- }
- if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- setInetError("Not a multicast address");
- return -1;
- }
- group.imr_address.s_addr = htons(INADDR_ANY);;
- group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- }
- return 0;
-}
-
-int UdpSocket::setMulticastTTL(int ttl)
-{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
- == SOCKET_ERROR) {
- setInetError("Can't set ttl");
- return -1;
- }
-
- return 0;
-}
-
-int UdpSocket::setMulticastSource(const char* source_addr)
-{
- struct in_addr addr;
- if (inet_aton(source_addr, &addr) == 0) {
- setInetError("Can't parse source address");
- return -1;
- }
-
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
- == SOCKET_ERROR) {
- setInetError("Can't set source address");
- return -1;
- }
-
- return 0;
-}
-
-UdpPacket::UdpPacket() { }
-
-UdpPacket::UdpPacket(size_t initSize) :
- m_buffer(initSize)
-{ }
-
-
-void UdpPacket::setSize(size_t newSize)
-{
- m_buffer.resize(newSize);
-}
-
-
-uint8_t* UdpPacket::getData()
-{
- return &m_buffer[0];
-}
-
-
-void UdpPacket::addData(const void *data, size_t size)
-{
- uint8_t *d = (uint8_t*)data;
- std::copy(d, d + size, std::back_inserter(m_buffer));
-}
-
-size_t UdpPacket::getSize()
-{
- return m_buffer.size();
-}
-
-InetAddress UdpPacket::getAddress()
-{
- return address;
-}
-
diff --git a/test/UdpSocket.h b/test/UdpSocket.h
deleted file mode 100644
index ba8f383..0000000
--- a/test/UdpSocket.h
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _UDPSOCKET
-#define _UDPSOCKET
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <stdlib.h>
-#include <iostream>
-#include <vector>
-
-class UdpPacket;
-
-
-/**
- * This class represents a socket for sending and receiving UDP packets.
- *
- * A UDP socket is the sending or receiving point for a packet delivery service.
- * Each packet sent or received on a datagram socket is individually
- * addressed and routed. Multiple packets sent from one machine to another may
- * be routed differently, and may arrive in any order.
- */
-class UdpSocket
-{
- public:
- /** Create a new socket that will not be bound to any port. To be used
- * for data output.
- */
- UdpSocket();
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- */
- UdpSocket(int port);
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- UdpSocket(int port, const std::string& name);
- ~UdpSocket();
- UdpSocket(const UdpSocket& other) = delete;
- const UdpSocket& operator=(const UdpSocket& other) = delete;
-
- /** reinitialise socket. Close the already open socket, and
- * create a new one
- */
- int reinit(int port, const std::string& name);
-
- /** Close the socket
- */
- int close(void);
-
- /** Send an UDP packet.
- * @param packet The UDP packet to be sent. It includes the data and the
- * destination address
- * return 0 if ok, -1 if error
- */
- int send(UdpPacket& packet);
-
- /** Send an UDP packet
- *
- * return 0 if ok, -1 if error
- */
- int send(const std::vector<uint8_t>& data, InetAddress destination);
-
- /** Receive an UDP packet.
- * @param packet The packet that will receive the data. The address will be set
- * to the source address.
- * @return 0 if ok, -1 if error
- */
- int receive(UdpPacket& packet);
-
- int joinGroup(char* groupname);
- int setMulticastSource(const char* source_addr);
- int setMulticastTTL(int ttl);
-
- /** Set blocking mode. By default, the socket is blocking.
- * @return 0 if ok
- * -1 if error
- */
- int setBlocking(bool block);
-
- protected:
-
- /// The address on which the socket is bound.
- InetAddress address;
- /// The low-level socket used by system functions.
- SOCKET listenSocket;
-};
-
-/** This class represents a UDP packet.
- *
- * A UDP packet contains a payload (sequence of bytes) and an address. For
- * outgoing packets, the address is the destination address. For incoming
- * packets, the address tells the user from what source the packet arrived from.
- */
-class UdpPacket
-{
- public:
- /** Construct an empty UDP packet.
- */
- UdpPacket();
- UdpPacket(size_t initSize);
-
- /** Give the pointer to data.
- * @return The pointer
- */
- uint8_t* getData(void);
-
- /** Append some data at the end of data buffer and adjust size.
- * @param data Pointer to the data to add
- * @param size Size in bytes of new data
- */
- void addData(const void *data, size_t size);
-
- size_t getSize(void);
-
- /** Changes size of the data buffer size. Keeps data intact unless
- * truncated.
- */
- void setSize(size_t newSize);
-
- /** Returns the UDP address of the packet.
- */
- InetAddress getAddress(void);
-
- const std::vector<uint8_t>& getBuffer(void) const {
- return m_buffer;
- }
-
-
- private:
- std::vector<uint8_t> m_buffer;
- InetAddress address;
-};
-
-#endif // _UDPSOCKET
diff --git a/test/main.cpp b/test/main.cpp
index 94df93d..537f162 100644
--- a/test/main.cpp
+++ b/test/main.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2017
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -23,6 +23,7 @@
#include <regex>
#include <string>
#include <thread>
+#include <numeric>
#include <vector>
#include <cstdint>
#include <cstdio>
@@ -32,79 +33,35 @@
#include "ETIWriter.hpp"
#include "STIDecoder.hpp"
#include "STIWriter.hpp"
-#include "UdpSocket.h"
+#include "AACDecoder.h"
+#include "Socket.h"
#include "ThreadsafeQueue.h"
#include "TimestampDecoder.h"
+using namespace std;
+
struct options_t {
std::string edi_source;
+ bool tcp_server = false;
std::string out_file;
- int verbose;
- bool enable_packet_loss;
- bool decode_sti;
- int packet_loss_inv_rate;
+ int verbose = 0;
+ bool enable_packet_loss = false;
+ bool decode_sti = false;
+ bool decode_aac = false;
+ int packet_loss_inv_rate = 0;
int max_delay;
};
-class UdpReceiver {
- public:
- UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
- ~UdpReceiver() {
- m_stop = true;
- if (m_thread.joinable()) {
- m_thread.join();
- }
- }
-
- void start(int port) {
- m_port = port;
- m_thread = std::thread(&UdpReceiver::m_run, this);
- }
-
- std::vector<uint8_t> get_packet_buffer(void) {
- UdpPacket p;
- m_packets.wait_and_pop(p);
-
- return p.getBuffer();
- }
-
- private:
- void m_run(void) {
- UdpSocket sock(m_port);
-
- const size_t packsize = 8192;
-
- while (not m_stop) {
- UdpPacket packet(packsize);
- int ret = sock.receive(packet);
- if (ret == 0) {
- if (packet.getSize() == packsize) {
- fprintf(stderr, "Warning, possible UDP truncation\n");
- }
- m_packets.push(packet);
- }
- else
- {
- fprintf(stderr, "Socket error: %s\n", inetErrMsg);
- m_stop = true;
- }
- }
- }
-
- int m_port;
- std::thread m_thread;
- std::atomic<bool> m_stop;
- ThreadsafeQueue<UdpPacket> m_packets;
-};
-
static void printUsage(char *name)
{
- fprintf(stderr, "Usage:\n %s <edi_source> [-v] [-s] [-o file] [-e inv_rate] [-m delay]\n", name);
+ fprintf(stderr, "Usage:\n %s <edi_source> [-S] [-v] [-s] [-a] [-o file] [-e inv_rate] [-m delay]\n", name);
fprintf(stderr, " edi_source is either a file, udp://:port, or tcp://addr:port\n");
+ fprintf(stderr, " -S For TCP: behave as a server\n");
fprintf(stderr, " -v Verbose mode (can be given several times)\n");
fprintf(stderr, " -s Decode STI instead of ETI\n");
+ fprintf(stderr, " -a Decode AAC contained in STI into edilib-aac.wav\n");
fprintf(stderr, " -e Enable packet loss simulator with probability 1/inv_rate\n");
fprintf(stderr, " -m Set max delay in AF Packets before we timeout\n");
fprintf(stderr, " -o Save decoded ETI or STI to file\n");
@@ -114,17 +71,16 @@ static options_t parseargs(int argc, char **argv)
{
options_t options;
- options.verbose = 0;
- options.max_delay = 0;
- options.enable_packet_loss = false;
-
while (true) {
- int c = getopt(argc, argv, "e:hm:o:sv");
+ int c = getopt(argc, argv, "ae:hm:o:sSv");
if (c == -1) {
break;
}
switch (c) {
+ case 'a':
+ options.decode_aac = true;
+ break;
case 'e':
options.enable_packet_loss = true;
options.packet_loss_inv_rate = strtol(optarg, nullptr, 10);
@@ -138,6 +94,9 @@ static options_t parseargs(int argc, char **argv)
case 's':
options.decode_sti = true;
break;
+ case 'S':
+ options.tcp_server = true;
+ break;
case '?':
case 'h':
printUsage(argv[0]);
@@ -171,18 +130,67 @@ static void analyse_timestamp(
const auto ts = ts_dec.getTimestamp();
- fprintf(stderr, "%ld: Got ETI Frame with FCT %d at %s: seconds=%d, utco=%d, now-EDI=%ld, MNSC: %d, now-MNSC=%ld, MNSC-EDI=%ld\n",
+ fprintf(stderr, "%ld: Got ETI Frame with FCT %d at %s: seconds=%d, utco=%d, now-EDI=%ld, MNSC: %d, now-MNSC=%ld, MNSC-EDI=%ld, TSTA=%fms (%d)\n",
now,
fct,
- eti.calculate_timestamp().c_str(),
- eti.seconds, eti.utco,
- now - eti.timestamp_in_unix_epoch(),
+ eti.timestamp.to_string().c_str(),
+ eti.timestamp.seconds, eti.timestamp.utco,
+ now - eti.timestamp.to_unix_epoch(),
ts->timestamp_sec,
now - ts->timestamp_sec,
- ts->timestamp_sec - eti.timestamp_in_unix_epoch()
+ ts->timestamp_sec - eti.timestamp.to_unix_epoch(),
+ ts->timestamp_pps / 16384.0,
+ ts->timestamp_pps
);
}
+static void dump_buf(const uint8_t *data, size_t len)
+{
+ static FILE* adler_fd = nullptr;
+ if (adler_fd == nullptr) {
+ adler_fd = fopen("/home/bram/dab/dabmux/adler32.edilib.txt", "w");
+ if (adler_fd == nullptr) {
+ abort();
+ }
+ }
+ constexpr uint32_t MOD_ADLER = 65521;
+ uint32_t a = 1, b = 0;
+ // Process each byte of the data in order
+ for (size_t index = 0; index < len; ++index) {
+ a = (a + data[index]) % MOD_ADLER;
+ b = (b + a) % MOD_ADLER;
+ }
+ uint32_t adler32 = (b << 16) | a;
+ fprintf(adler_fd, "Frame %zu %08X\n", len, adler32);
+ fflush(adler_fd);
+}
+
+static void write_outputs(
+ const options_t& options,
+ vector<uint8_t>&& frame,
+ deque<vector<uint8_t> >& aac_frames,
+ FILE* fd_out,
+ AACDecoder& aac_decoder)
+{
+ fwrite(frame.data(), frame.size(), 1, fd_out);
+
+ if (options.decode_aac) {
+ aac_frames.emplace_back(move(frame));
+
+ // Collect five frames to build one superframe
+ if (aac_frames.size() == 5) {
+ vector<uint8_t> superframe;
+ for (const auto& f : aac_frames) {
+ superframe.insert(superframe.end(), f.cbegin(), f.cend());
+ }
+#warning "debug this"
+ aac_decoder.decode_frame(superframe.data(), superframe.size());
+ aac_frames.clear();
+ }
+ }
+
+}
+
int main(int argc, char **argv)
{
if (argc == 1) {
@@ -210,6 +218,8 @@ int main(int argc, char **argv)
EdiDecoder::ETIWriter eti_writer;
EdiDecoder::ETIDecoder eti_decoder(eti_writer, options.verbose > 1);
+ deque<vector<uint8_t> > aac_frames;
+ AACDecoder aac_decoder("edilib-aac.wav");
EdiDecoder::STIWriter sti_writer;
EdiDecoder::STIDecoder sti_decoder(sti_writer, options.verbose > 1);
@@ -226,8 +236,9 @@ int main(int argc, char **argv)
const int udp_port = std::stoi(m[1].str());
fprintf(stderr, "Receiving from udp port %d\n", udp_port);
- UdpReceiver rx;
- rx.start(udp_port);
+ Socket::UDPReceiver rx;
+ const size_t max_packets_queued = 10;
+ rx.start(udp_port, "", "", max_packets_queued);
while (true) {
const auto &buf = rx.get_packet_buffer();
@@ -254,63 +265,54 @@ int main(int argc, char **argv)
std::vector<uint8_t> frame;
if (options.decode_sti) {
- frame = sti_writer.getFrame();
+ const auto sti = sti_writer.getFrame();
+ frame = move(sti.frame);
+ fprintf(stderr, "STI %s\n", sti.timestamp.to_string().c_str());
}
else {
const auto eti = eti_writer.getEtiFrame();
if (not eti.frame.empty() and options.verbose > 0) {
analyse_timestamp(ts_dec, eti_writer, eti);
}
- frame = eti.frame;
+ frame = move(eti.frame);
}
if (fd_out and not frame.empty()) {
- fwrite(frame.data(), frame.size(), 1, fd_out);
+ write_outputs(options, move(frame), aac_frames, fd_out,
+ aac_decoder);
}
}
}
- else if (std::regex_match(options.edi_source, m, re_tcp)) {
+ else if (std::regex_match(options.edi_source, m, re_tcp) /* TCP client */ ) {
const std::string hostname = m[1].str();
const int port = std::stoi(m[2].str());
+ Socket::TCPSocket sock;
- fprintf(stderr, "Receiving from tcp %s:%d\n", hostname.c_str(), port);
- int sock;
- if ((sock = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
- fprintf(stderr, "Could not create socket %s\n", strerror(errno));
- return -1;
- }
-
- struct sockaddr_in addr;
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
-
- hostent *host = gethostbyname(hostname.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
+ if (options.tcp_server) {
+ fprintf(stderr, "Listening on TCP %s:%d\n", hostname.c_str(), port);
+ Socket::TCPSocket listen_sock;
+ listen_sock.listen(port, hostname);
+ sock = listen_sock.accept(0);
}
else {
- fprintf(stderr, "Could not resolve hostname %s\n", strerror(errno));
- return -1;
- }
-
- ssize_t ret = ::connect(sock, (struct sockaddr*)&addr, sizeof(addr));
- if (ret == -1 and errno != EINPROGRESS) {
- fprintf(stderr, "Could not connect %s\n", strerror(errno));
- return -1;
+ fprintf(stderr, "Connecting to TCP %s:%d\n", hostname.c_str(), port);
+ sock.connect(hostname, port);
}
+ ssize_t ret = 0;
do {
- const size_t bufsize = 4096;
+ const size_t bufsize = 32;
std::vector<uint8_t> buf(bufsize);
- ret = ::recv(sock, buf.data(), buf.size(), 0);
+ ret = sock.recv(buf.data(), buf.size(), 0);
if (ret > 0) {
buf.resize(ret);
std::vector<uint8_t> frame;
if (options.decode_sti) {
sti_decoder.push_bytes(buf);
- frame = sti_writer.getFrame();
+ const auto sti = sti_writer.getFrame();
+ frame = move(sti.frame);
+ dump_buf(frame.data(), frame.size());
}
else {
eti_decoder.push_bytes(buf);
@@ -319,17 +321,14 @@ int main(int argc, char **argv)
if (not eti.frame.empty() and options.verbose > 0) {
analyse_timestamp(ts_dec, eti_writer, eti);
}
- frame = eti.frame;
+ frame = move(eti.frame);
}
if (fd_out and not frame.empty()) {
- fwrite(frame.data(), frame.size(), 1, fd_out);
+ write_outputs(options, move(frame), aac_frames, fd_out,
+ aac_decoder);
}
}
- else if (ret == -1) {
- fprintf(stderr, "Recv error %s\n", strerror(errno));
- return -1;
- }
} while (ret > 0);
}
@@ -345,24 +344,26 @@ int main(int argc, char **argv)
while (not feof(fd)) {
const size_t bufsize = 1024;
std::vector<uint8_t> buf(bufsize);
- int bytes_read = fread(buf.data(), bufsize, 1, fd);
+ int bytes_read = fread(buf.data(), 1, bufsize, fd);
if (bytes_read > 0) {
buf.resize(bytes_read);
std::vector<uint8_t> frame;
if (options.decode_sti) {
sti_decoder.push_bytes(buf);
- frame = sti_writer.getFrame();
+ const auto sti = sti_writer.getFrame();
+ frame = move(sti.frame);
}
else {
eti_decoder.push_bytes(buf);
const auto eti = eti_writer.getEtiFrame();
- frame = eti.frame;
+ frame = move(eti.frame);
}
if (fd_out and not frame.empty()) {
- fwrite(frame.data(), frame.size(), 1, fd_out);
+ write_outputs(options, move(frame), aac_frames, fd_out,
+ aac_decoder);
}
}
else {
diff --git a/test/wavfile.cpp b/test/wavfile.cpp
new file mode 100644
index 0000000..4bd4bd6
--- /dev/null
+++ b/test/wavfile.cpp
@@ -0,0 +1,271 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2009 Martin Storsjo
+ * Copyright (C) 2017 Matthias P. Braendli
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#include "wavfile.h"
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <cstdint>
+#include <stdexcept>
+
+#define TAG(a, b, c, d) (((a) << 24) | ((b) << 16) | ((c) << 8) | (d))
+
+struct wav_reader {
+ FILE *wav;
+ uint32_t data_length;
+
+ int format;
+ int sample_rate;
+ int bits_per_sample;
+ int channels;
+ int byte_rate;
+ int block_align;
+
+ int streamed;
+};
+
+static uint32_t read_tag(struct wav_reader* wr) {
+ uint32_t tag = 0;
+ tag = (tag << 8) | fgetc(wr->wav);
+ tag = (tag << 8) | fgetc(wr->wav);
+ tag = (tag << 8) | fgetc(wr->wav);
+ tag = (tag << 8) | fgetc(wr->wav);
+ return tag;
+}
+
+static uint32_t read_int32(struct wav_reader* wr) {
+ uint32_t value = 0;
+ value |= fgetc(wr->wav) << 0;
+ value |= fgetc(wr->wav) << 8;
+ value |= fgetc(wr->wav) << 16;
+ value |= fgetc(wr->wav) << 24;
+ return value;
+}
+
+static uint16_t read_int16(struct wav_reader* wr) {
+ uint16_t value = 0;
+ value |= fgetc(wr->wav) << 0;
+ value |= fgetc(wr->wav) << 8;
+ return value;
+}
+
+static void skip(FILE *f, int n) {
+ int i;
+ for (i = 0; i < n; i++)
+ fgetc(f);
+}
+
+void* wav_read_open(const char *filename) {
+ struct wav_reader* wr = (struct wav_reader*) malloc(sizeof(*wr));
+ long data_pos = 0;
+ memset(wr, 0, sizeof(*wr));
+
+ if (!strcmp(filename, "-"))
+ wr->wav = stdin;
+ else
+ wr->wav = fopen(filename, "rb");
+ if (wr->wav == NULL) {
+ free(wr);
+ return NULL;
+ }
+
+ while (1) {
+ uint32_t tag, tag2, length;
+ tag = read_tag(wr);
+ if (feof(wr->wav))
+ break;
+ length = read_int32(wr);
+ if (!length || length >= 0x7fff0000) {
+ wr->streamed = 1;
+ length = ~0;
+ }
+ if (tag != TAG('R', 'I', 'F', 'F') || length < 4) {
+ fseek(wr->wav, length, SEEK_CUR);
+ continue;
+ }
+ tag2 = read_tag(wr);
+ length -= 4;
+ if (tag2 != TAG('W', 'A', 'V', 'E')) {
+ fseek(wr->wav, length, SEEK_CUR);
+ continue;
+ }
+ // RIFF chunk found, iterate through it
+ while (length >= 8) {
+ uint32_t subtag, sublength;
+ subtag = read_tag(wr);
+ if (feof(wr->wav))
+ break;
+ sublength = read_int32(wr);
+ length -= 8;
+ if (length < sublength)
+ break;
+ if (subtag == TAG('f', 'm', 't', ' ')) {
+ if (sublength < 16) {
+ // Insufficient data for 'fmt '
+ break;
+ }
+ wr->format = read_int16(wr);
+ wr->channels = read_int16(wr);
+ wr->sample_rate = read_int32(wr);
+ wr->byte_rate = read_int32(wr);
+ wr->block_align = read_int16(wr);
+ wr->bits_per_sample = read_int16(wr);
+ if (wr->format == 0xfffe) {
+ if (sublength < 28) {
+ // Insufficient data for waveformatex
+ break;
+ }
+ skip(wr->wav, 8);
+ wr->format = read_int32(wr);
+ skip(wr->wav, sublength - 28);
+ } else {
+ skip(wr->wav, sublength - 16);
+ }
+ } else if (subtag == TAG('d', 'a', 't', 'a')) {
+ data_pos = ftell(wr->wav);
+ wr->data_length = sublength;
+ if (!wr->data_length || wr->streamed) {
+ wr->streamed = 1;
+ return wr;
+ }
+ fseek(wr->wav, sublength, SEEK_CUR);
+ } else {
+ skip(wr->wav, sublength);
+ }
+ length -= sublength;
+ }
+ if (length > 0) {
+ // Bad chunk?
+ fseek(wr->wav, length, SEEK_CUR);
+ }
+ }
+ fseek(wr->wav, data_pos, SEEK_SET);
+ return wr;
+}
+
+void wav_read_close(void* obj) {
+ struct wav_reader* wr = (struct wav_reader*) obj;
+ if (wr->wav != stdin)
+ fclose(wr->wav);
+ free(wr);
+}
+
+int wav_get_header(void* obj, int* format, int* channels, int* sample_rate, int* bits_per_sample, unsigned int* data_length) {
+ struct wav_reader* wr = (struct wav_reader*) obj;
+ if (format)
+ *format = wr->format;
+ if (channels)
+ *channels = wr->channels;
+ if (sample_rate)
+ *sample_rate = wr->sample_rate;
+ if (bits_per_sample)
+ *bits_per_sample = wr->bits_per_sample;
+ if (data_length)
+ *data_length = wr->data_length;
+ return wr->format && wr->sample_rate;
+}
+
+int wav_read_data(void* obj, unsigned char* data, unsigned int length) {
+ struct wav_reader* wr = (struct wav_reader*) obj;
+ int n;
+ if (wr->wav == NULL)
+ return -1;
+ if (length > wr->data_length && !wr->streamed)
+ length = wr->data_length;
+ n = fread(data, 1, length, wr->wav);
+ wr->data_length -= length;
+ return n;
+}
+
+//============== WAV writer functions
+
+struct wavfile_header {
+ char riff_tag[4];
+ int riff_length;
+ char wave_tag[4];
+ char fmt_tag[4];
+ int fmt_length;
+ short audio_format;
+ short num_channels;
+ int sample_rate;
+ int byte_rate;
+ short block_align;
+ short bits_per_sample;
+ char data_tag[4];
+ int data_length;
+};
+
+WavWriter::WavWriter(const char *filename)
+{
+ m_fd = fopen(filename, "w+");
+ if (not m_fd) {
+ throw std::runtime_error("Could not open wav file");
+ }
+}
+
+void WavWriter::initialise_header(int rate, int channels)
+{
+ struct wavfile_header header;
+
+ int samples_per_second = rate;
+ int bits_per_sample = 16;
+
+ strncpy(header.riff_tag,"RIFF",4);
+ strncpy(header.wave_tag,"WAVE",4);
+ strncpy(header.fmt_tag,"fmt ",4);
+ strncpy(header.data_tag,"data",4);
+
+ header.riff_length = 0;
+ header.fmt_length = 16;
+ header.audio_format = 1;
+ header.num_channels = channels;
+ header.sample_rate = samples_per_second;
+ header.byte_rate = samples_per_second*(bits_per_sample/8)*channels;
+ header.block_align = channels*bits_per_sample/8;
+ header.bits_per_sample = bits_per_sample;
+ header.data_length = 0;
+
+ fwrite(&header,sizeof(header),1,m_fd);
+
+ fflush(m_fd);
+}
+
+WavWriter::~WavWriter()
+{
+ // The wav file header contains the full file size, we must
+ // write this at the end
+
+ int file_length = ftell(m_fd);
+
+ int data_length = file_length - sizeof(struct wavfile_header);
+ fseek(m_fd,sizeof(struct wavfile_header) - sizeof(int),SEEK_SET);
+ fwrite(&data_length,sizeof(data_length),1,m_fd);
+
+ int riff_length = file_length - 8;
+ fseek(m_fd,4,SEEK_SET);
+ fwrite(&riff_length,sizeof(riff_length),1,m_fd);
+
+ fclose(m_fd);
+}
+
+void WavWriter::write_data(const uint8_t *data, int length)
+{
+ fwrite(data,sizeof(uint8_t),length,m_fd);
+}
+
diff --git a/test/wavfile.h b/test/wavfile.h
new file mode 100644
index 0000000..6d68053
--- /dev/null
+++ b/test/wavfile.h
@@ -0,0 +1,45 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2009 Martin Storsjo
+ * Copyright (C) 2018 Matthias P. Braendli
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#pragma once
+
+#include <cstdio>
+#include <cstdint>
+
+void* wav_read_open(const char *filename);
+void wav_read_close(void* obj);
+
+int wav_get_header(void* obj, int* format, int* channels, int* sample_rate, int* bits_per_sample, unsigned int* data_length);
+int wav_read_data(void* obj, unsigned char* data, unsigned int length);
+
+class WavWriter {
+ public:
+ WavWriter(const char *filename);
+ ~WavWriter();
+ WavWriter(const WavWriter& other) = delete;
+ WavWriter& operator=(const WavWriter& other) = delete;
+
+ void initialise_header(int rate, int channels);
+
+ void write_data(const uint8_t *data, int length);
+
+ private:
+ FILE *m_fd = nullptr;
+};
+