aboutsummaryrefslogtreecommitdiffstats
path: root/lib/Socket.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2020-04-21 15:56:26 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2020-04-21 15:56:26 +0200
commitcd5ef5af74b7dbee8c27549cd219a4ce5a0999c5 (patch)
treecbbf0814c367f39af3ce549fe76f6f82bf9a044f /lib/Socket.cpp
parenta92840837bff745b499264ad5180232a1a9fc5d2 (diff)
downloaddabmux-cd5ef5af74b7dbee8c27549cd219a4ce5a0999c5.tar.gz
dabmux-cd5ef5af74b7dbee8c27549cd219a4ce5a0999c5.tar.bz2
dabmux-cd5ef5af74b7dbee8c27549cd219a4ce5a0999c5.zip
Common 33a8362: EDI TCP output: handle disconnects
Diffstat (limited to 'lib/Socket.cpp')
-rw-r--r--lib/Socket.cpp70
1 files changed, 67 insertions, 3 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index bfbef93..159de7e 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)
// This suppresses the -Wlogical-op warning
#if EAGAIN == EWOULDBLOCK
- if (errno == EAGAIN) {
+ if (errno == EAGAIN)
#else
- if (errno == EAGAIN or errno == EWOULDBLOCK) {
+ if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
+ {
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -733,7 +734,9 @@ TCPConnection::~TCPConnection()
m_running = false;
vector<uint8_t> termination_marker;
queue.push(termination_marker);
- m_sender_thread.join();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
}
void TCPConnection::process()
@@ -905,4 +908,65 @@ void TCPReceiveServer::process()
}
}
+TCPSendClient::TCPSendClient(const std::string& hostname, int port) :
+ m_hostname(hostname),
+ m_port(port),
+ m_running(true)
+{
+ m_sender_thread = std::thread(&TCPSendClient::process, this);
+}
+
+TCPSendClient::~TCPSendClient()
+{
+ m_running = false;
+ m_queue.trigger_wakeup();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
+}
+
+void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ m_queue.push(buffer);
+}
+
+void TCPSendClient::process()
+{
+ try {
+ while (m_running) {
+ if (m_is_connected) {
+ try {
+ vector<uint8_t> incoming;
+ m_queue.wait_and_pop(incoming);
+ if (m_sock.sendall(incoming.data(), incoming.size()) == -1) {
+ m_is_connected = false;
+ m_sock = TCPSocket();
+ }
+ }
+ catch (const ThreadsafeQueueWakeup&) {
+ break;
+ }
+ }
+ else {
+ try {
+ m_sock.connect(m_hostname, m_port);
+ m_is_connected = true;
+ }
+ catch (const runtime_error& e) {
+ m_is_connected = false;
+ this_thread::sleep_for(chrono::seconds(1));
+ }
+ }
+ }
+ }
+ catch (const runtime_error& e) {
+ m_exception_data = e.what();
+ m_running = false;
+ }
+}
+
}