aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-02-15 11:24:44 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-02-15 11:24:44 +0100
commitcb23ca65f4822fd7d4758a4fbb32607cef6f0ef2 (patch)
treeef59b359bc0e762dc17b1e8166418d058130e07c
parent02650f3f087ae3cc9fd6ae0aaf011aea2bf177df (diff)
downloadODR-EDI2EDI-cb23ca65f4822fd7d4758a4fbb32607cef6f0ef2.tar.gz
ODR-EDI2EDI-cb23ca65f4822fd7d4758a4fbb32607cef6f0ef2.tar.bz2
ODR-EDI2EDI-cb23ca65f4822fd7d4758a4fbb32607cef6f0ef2.zip
Common 4ad00b8: Use separate thread for EDI output spreading
-rw-r--r--lib/edioutput/Transport.cpp103
-rw-r--r--lib/edioutput/Transport.h12
2 files changed, 81 insertions, 34 deletions
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index fd7e2b7..136c71c 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -27,6 +27,7 @@
#include "Transport.h"
#include <iterator>
#include <cmath>
+#include <thread>
using namespace std;
@@ -97,11 +98,29 @@ Sender::Sender(const configuration_t& conf) :
edi_debug_file.open("./edi.debug");
}
+ if (m_conf.enable_pft) {
+ unique_lock<mutex> lock(m_mutex);
+ m_running = true;
+ m_thread = thread(&Sender::run, this);
+ }
+
if (m_conf.verbose) {
etiLog.log(info, "EDI output set up");
}
}
+Sender::~Sender()
+{
+ {
+ unique_lock<mutex> lock(m_mutex);
+ m_running = false;
+ }
+
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
void Sender::write(const TagPacket& tagpacket)
{
// Assemble into one AF Packet
@@ -128,11 +147,55 @@ void Sender::write(const TagPacket& tagpacket)
/* Separate insertion into map and transmission so as to make spreading possible */
const auto now = steady_clock::now();
- auto tp = now;
- for (auto& edi_frag : edi_fragments) {
- m_pending_frames[tp] = move(edi_frag);
- tp += inter_fragment_wait_time;
+ {
+ auto tp = now;
+ unique_lock<mutex> lock(m_mutex);
+ for (auto& edi_frag : edi_fragments) {
+ m_pending_frames[tp] = move(edi_frag);
+ tp += inter_fragment_wait_time;
+ }
+ }
+
+ // Transmission done in run() function
+ }
+ else /* PFT disabled */ {
+ // Send over ethernet
+ if (m_conf.dump) {
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ }
+
+ for (auto& dest : m_conf.destinations) {
+ if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
+
+ if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
+ fprintf(stderr, "EDI Output: AF packet larger than 1400,"
+ " consider using PFT to avoid UP fragmentation.\n");
+ m_udp_fragmentation_warning_printed = true;
+ }
+
+ udp_sockets.at(udp_dest.get())->send(af_packet, addr);
+ }
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
+ tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
+ }
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet);
+ }
+ else {
+ throw logic_error("EDI destination not implemented");
+ }
}
+ }
+}
+
+void Sender::run()
+{
+ while (m_running) {
+ unique_lock<mutex> lock(m_mutex);
+ const auto now = chrono::steady_clock::now();
// Send over ethernet
for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
@@ -167,37 +230,9 @@ void Sender::write(const TagPacket& tagpacket)
++it;
}
}
- }
- else /* PFT disabled */ {
- // Send over ethernet
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
- fprintf(stderr, "EDI Output: AF packet larger than 1400,"
- " consider using PFT to avoid UP fragmentation.\n");
- m_udp_fragmentation_warning_printed = true;
- }
- udp_sockets.at(udp_dest.get())->send(af_packet, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(af_packet);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
+ lock.unlock();
+ this_thread::sleep_for(chrono::microseconds(500));
}
}
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index 9ee4e27..3bcc2f4 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -39,6 +39,8 @@
#include <stdexcept>
#include <fstream>
#include <cstdint>
+#include <thread>
+#include <mutex>
namespace edi {
@@ -47,10 +49,15 @@ namespace edi {
class Sender {
public:
Sender(const configuration_t& conf);
+ Sender(const Sender&) = delete;
+ Sender operator=(const Sender&) = delete;
+ ~Sender();
void write(const TagPacket& tagpacket);
private:
+ void run();
+
bool m_udp_fragmentation_warning_printed = false;
configuration_t m_conf;
@@ -66,6 +73,11 @@ class Sender {
std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
+ // PFT spreading requires sending UDP packets at specific time, independently of
+ // time when write() gets called
+ std::thread m_thread;
+ std::mutex m_mutex;
+ bool m_running = false;
std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
};