diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-10-28 11:19:18 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-10-28 11:19:18 +0100 |
commit | 57c4e58ac139120a971dce184351256712984902 (patch) | |
tree | fb810fe9417b4c804e72e7fe243d99729b3949bf | |
parent | b79a56d3361f209d1d27a67325e1e5b33998cf8f (diff) | |
download | ODR-EDI2EDI-57c4e58ac139120a971dce184351256712984902.tar.gz ODR-EDI2EDI-57c4e58ac139120a971dce184351256712984902.tar.bz2 ODR-EDI2EDI-57c4e58ac139120a971dce184351256712984902.zip |
Autoreconnect and handle signals
-rw-r--r-- | lib/Socket.cpp | 9 | ||||
-rw-r--r-- | lib/Socket.h | 6 | ||||
-rw-r--r-- | src/main.cpp | 101 |
3 files changed, 86 insertions, 30 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp index bc1b179..d41ed1c 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -630,8 +630,13 @@ ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) { ssize_t ret = ::recv(m_sock, buffer, length, flags); if (ret == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive error: " + errstr); + if (errno == EINTR) { + throw Interrupted(); + } + else { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive error: " + errstr); + } } return ret; } diff --git a/lib/Socket.h b/lib/Socket.h index 8c6f8a9..8881be3 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -180,12 +180,12 @@ class TCPSocket { */ ssize_t send(const void* data, size_t size, int timeout_ms=0); - /* Returns number of bytes read, 0 on disconnect. Throws a - * runtime_error on error */ + class Interrupted {}; + /* Returns number of bytes read, 0 on disconnect. + * Throws Interrupted on EINTR, runtime_error on error */ ssize_t recv(void *buffer, size_t length, int flags); class Timeout {}; - class Interrupted {}; /* Returns number of bytes read, 0 on disconnect or refused connection. * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error * on error diff --git a/src/main.cpp b/src/main.cpp index 6cf14f8..3b1f30f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -21,15 +21,16 @@ along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#include <getopt.h> -#include <cmath> -#include <cstring> #include <chrono> #include <iostream> #include <iterator> #include <memory> #include <thread> #include <vector> +#include <cmath> +#include <cstring> +#include <signal.h> +#include <getopt.h> #include "Log.h" #include "EDISender.h" #include "edioutput/TagItems.h" @@ -41,6 +42,18 @@ using namespace std; constexpr long DEFAULT_BACKOFF = 5000; +volatile sig_atomic_t running = 1; + +void signal_handler(int signum) +{ + if (signum == SIGTERM) { + fprintf(stderr, "Received SIGTERM\n"); + exit(0); + } + //killpg(0, SIGPIPE); + running = 0; +} + static void usage() { cerr << "Usage:" << endl; @@ -89,10 +102,6 @@ class ReceivedTagItem : public edi::TagItem { class Main : public EdiDecoder::ETIDataCollector { public: - Main() : edi_decoder(*this) - { - } - // Tell the ETIWriter what EDI protocol we receive in *ptr. // This is not part of the ETI data, but is used as check virtual void update_protocol( @@ -105,11 +114,10 @@ class Main : public EdiDecoder::ETIDataCollector { dflc = fc_data.dflc; } + // Ignore most events because we are interested in retransmitting EDI, not + // decoding it virtual void update_fic(std::vector<uint8_t>&& fic) override { } virtual void update_err(uint8_t err) override { } - - // In addition to TSTA in ETI, EDI also transports more time - // stamp information. virtual void update_edi_time(uint32_t utco, uint32_t seconds) override { } virtual void update_mnsc(uint16_t mnsc) override { } virtual void update_rfu(uint16_t rfu) override { } @@ -249,36 +257,68 @@ class Main : public EdiDecoder::ETIDataCollector { return 1; } - edi_decoder.set_verbose(edi_conf.verbose); - etiLog.level(info) << "Setting up EDI2EDI with delay " << delay_ms << " ms. " << (drop_late_packets ? "Will" : "Will not") << " drop late packets"; edisender.start(edi_conf, delay_ms, drop_late_packets); edisender.print_configuration(); + try { + while (running) { + EdiDecoder::ETIDecoder edi_decoder(*this); + + edi_decoder.set_verbose(edi_conf.verbose); + run(edi_decoder, connect_to_host, connect_to_port); + if (not running) { + break; + } + etiLog.level(info) << "Source disconnected, backoff " << backoff_after_reset_ms << "ms..."; + + // There is no state inside the edisender or inside Main that we need to + // clear. + + this_thread::sleep_for(chrono::milliseconds(backoff_after_reset_ms)); + } + } + catch (const std::runtime_error& e) { + etiLog.level(error) << "Caught exception: " << e.what(); + return 1; + } + + return 0; + } + + private: + void run(EdiDecoder::ETIDecoder& edi_decoder, const string& connect_to_host, int connect_to_port) + { Socket::TCPSocket sock; etiLog.level(info) << "Connecting to TCP " << connect_to_host << ":" << connect_to_port; - sock.connect(connect_to_host, connect_to_port); + try { + sock.connect(connect_to_host, connect_to_port); + } + catch (const std::runtime_error& e) { + etiLog.level(error) << "Error connecting to source: " << e.what(); + return; + } ssize_t ret = 0; do { const size_t bufsize = 32; std::vector<uint8_t> buf(bufsize); - ret = sock.recv(buf.data(), buf.size(), 0); - if (ret > 0) { - buf.resize(ret); - std::vector<uint8_t> frame; - edi_decoder.push_bytes(buf); + try { + ret = sock.recv(buf.data(), buf.size(), 0); + if (ret > 0) { + buf.resize(ret); + std::vector<uint8_t> frame; + edi_decoder.push_bytes(buf); + } } - } while (ret > 0); - - return 0; + catch (const Socket::TCPSocket::Interrupted&) { + } + } while (running and ret > 0); } - private: - - void add_edi_destination(void) + void add_edi_destination() { if (not dest_addr_set) { throw std::runtime_error("Destination address not specified for destination number " + @@ -340,7 +380,6 @@ class Main : public EdiDecoder::ETIDataCollector { bool ttl_set = false; bool dest_addr_set = false; edi::configuration_t edi_conf; - EdiDecoder::ETIDecoder edi_decoder; int delay_ms = 500; bool drop_late_packets = false; uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; @@ -365,6 +404,18 @@ int main(int argc, char **argv) int ret = 1; + struct sigaction sa; + memset(&sa, 0, sizeof(struct sigaction)); + sa.sa_handler = &signal_handler; + + const int sigs[] = {SIGHUP, SIGQUIT, SIGINT, SIGTERM}; + for (int sig : sigs) { + if (sigaction(sig, &sa, nullptr) == -1) { + perror("sigaction"); + return EXIT_FAILURE; + } + } + try { Main m; ret = m.start(argc, argv); |