aboutsummaryrefslogtreecommitdiffstats
path: root/lib/Socket.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-25 10:50:23 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-25 10:50:23 +0200
commit03967733d70220e2de7af3cdad320aec5c82ede1 (patch)
tree4a1bd7adfb8825c95cfc1fa0c69f857aef234561 /lib/Socket.h
parent15d7ad8ac5bb187ac323da7dc30b9724b18c7df7 (diff)
downloaddabmux-03967733d70220e2de7af3cdad320aec5c82ede1.tar.gz
dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.tar.bz2
dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.zip
Add more EDI input improvements
Diffstat (limited to 'lib/Socket.h')
-rw-r--r--lib/Socket.h36
1 files changed, 31 insertions, 5 deletions
diff --git a/lib/Socket.h b/lib/Socket.h
index 2393584..8bb7fe1 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -127,7 +127,7 @@ class UDPSocket
/* Threaded UDP receiver */
class UDPReceiver {
public:
- UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
+ UDPReceiver();
~UDPReceiver();
UDPReceiver(const UDPReceiver&) = delete;
UDPReceiver operator=(const UDPReceiver&) = delete;
@@ -142,12 +142,12 @@ class UDPReceiver {
private:
void m_run(void);
- int m_port;
+ int m_port = 0;
std::string m_bindto;
std::string m_mcastaddr;
- size_t m_max_packets_queued;
+ size_t m_max_packets_queued = 1;
std::thread m_thread;
- std::atomic<bool> m_stop;
+ std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
ThreadsafeQueue<UDPPacket> m_packets;
UDPSocket m_sock;
};
@@ -254,7 +254,7 @@ class TCPDataDispatcher
void write(const std::vector<uint8_t>& data);
private:
- void process(void);
+ void process();
size_t m_max_queue_size;
@@ -265,4 +265,30 @@ class TCPDataDispatcher
std::list<TCPConnection> m_connections;
};
+/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
+ */
+class TCPReceiveServer {
+ public:
+ TCPReceiveServer(size_t blocksize);
+ ~TCPReceiveServer();
+ TCPReceiveServer(const TCPReceiveServer&) = delete;
+ TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
+
+ void start(int listen_port, const std::string& address);
+
+ // Return a vector that contains up to blocksize bytes of data, or
+ // and empty vector if no data is available.
+ std::vector<uint8_t> receive();
+
+ private:
+ void process();
+
+ size_t m_blocksize = 0;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+};
+
}