aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2020-10-28 11:19:18 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2020-10-28 11:19:18 +0100
commit57c4e58ac139120a971dce184351256712984902 (patch)
treefb810fe9417b4c804e72e7fe243d99729b3949bf
parentb79a56d3361f209d1d27a67325e1e5b33998cf8f (diff)
downloadODR-EDI2EDI-57c4e58ac139120a971dce184351256712984902.tar.gz
ODR-EDI2EDI-57c4e58ac139120a971dce184351256712984902.tar.bz2
ODR-EDI2EDI-57c4e58ac139120a971dce184351256712984902.zip
Autoreconnect and handle signals
-rw-r--r--lib/Socket.cpp9
-rw-r--r--lib/Socket.h6
-rw-r--r--src/main.cpp101
3 files changed, 86 insertions, 30 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index bc1b179..d41ed1c 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -630,8 +630,13 @@ ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
{
ssize_t ret = ::recv(m_sock, buffer, length, flags);
if (ret == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive error: " + errstr);
+ if (errno == EINTR) {
+ throw Interrupted();
+ }
+ else {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive error: " + errstr);
+ }
}
return ret;
}
diff --git a/lib/Socket.h b/lib/Socket.h
index 8c6f8a9..8881be3 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -180,12 +180,12 @@ class TCPSocket {
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
- /* Returns number of bytes read, 0 on disconnect. Throws a
- * runtime_error on error */
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect.
+ * Throws Interrupted on EINTR, runtime_error on error */
ssize_t recv(void *buffer, size_t length, int flags);
class Timeout {};
- class Interrupted {};
/* Returns number of bytes read, 0 on disconnect or refused connection.
* Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
* on error
diff --git a/src/main.cpp b/src/main.cpp
index 6cf14f8..3b1f30f 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -21,15 +21,16 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
-#include <getopt.h>
-#include <cmath>
-#include <cstring>
#include <chrono>
#include <iostream>
#include <iterator>
#include <memory>
#include <thread>
#include <vector>
+#include <cmath>
+#include <cstring>
+#include <signal.h>
+#include <getopt.h>
#include "Log.h"
#include "EDISender.h"
#include "edioutput/TagItems.h"
@@ -41,6 +42,18 @@ using namespace std;
constexpr long DEFAULT_BACKOFF = 5000;
+volatile sig_atomic_t running = 1;
+
+void signal_handler(int signum)
+{
+ if (signum == SIGTERM) {
+ fprintf(stderr, "Received SIGTERM\n");
+ exit(0);
+ }
+ //killpg(0, SIGPIPE);
+ running = 0;
+}
+
static void usage()
{
cerr << "Usage:" << endl;
@@ -89,10 +102,6 @@ class ReceivedTagItem : public edi::TagItem {
class Main : public EdiDecoder::ETIDataCollector {
public:
- Main() : edi_decoder(*this)
- {
- }
-
// Tell the ETIWriter what EDI protocol we receive in *ptr.
// This is not part of the ETI data, but is used as check
virtual void update_protocol(
@@ -105,11 +114,10 @@ class Main : public EdiDecoder::ETIDataCollector {
dflc = fc_data.dflc;
}
+ // Ignore most events because we are interested in retransmitting EDI, not
+ // decoding it
virtual void update_fic(std::vector<uint8_t>&& fic) override { }
virtual void update_err(uint8_t err) override { }
-
- // In addition to TSTA in ETI, EDI also transports more time
- // stamp information.
virtual void update_edi_time(uint32_t utco, uint32_t seconds) override { }
virtual void update_mnsc(uint16_t mnsc) override { }
virtual void update_rfu(uint16_t rfu) override { }
@@ -249,36 +257,68 @@ class Main : public EdiDecoder::ETIDataCollector {
return 1;
}
- edi_decoder.set_verbose(edi_conf.verbose);
-
etiLog.level(info) << "Setting up EDI2EDI with delay " << delay_ms << " ms. " <<
(drop_late_packets ? "Will" : "Will not") << " drop late packets";
edisender.start(edi_conf, delay_ms, drop_late_packets);
edisender.print_configuration();
+ try {
+ while (running) {
+ EdiDecoder::ETIDecoder edi_decoder(*this);
+
+ edi_decoder.set_verbose(edi_conf.verbose);
+ run(edi_decoder, connect_to_host, connect_to_port);
+ if (not running) {
+ break;
+ }
+ etiLog.level(info) << "Source disconnected, backoff " << backoff_after_reset_ms << "ms...";
+
+ // There is no state inside the edisender or inside Main that we need to
+ // clear.
+
+ this_thread::sleep_for(chrono::milliseconds(backoff_after_reset_ms));
+ }
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "Caught exception: " << e.what();
+ return 1;
+ }
+
+ return 0;
+ }
+
+ private:
+ void run(EdiDecoder::ETIDecoder& edi_decoder, const string& connect_to_host, int connect_to_port)
+ {
Socket::TCPSocket sock;
etiLog.level(info) << "Connecting to TCP " << connect_to_host << ":" << connect_to_port;
- sock.connect(connect_to_host, connect_to_port);
+ try {
+ sock.connect(connect_to_host, connect_to_port);
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "Error connecting to source: " << e.what();
+ return;
+ }
ssize_t ret = 0;
do {
const size_t bufsize = 32;
std::vector<uint8_t> buf(bufsize);
- ret = sock.recv(buf.data(), buf.size(), 0);
- if (ret > 0) {
- buf.resize(ret);
- std::vector<uint8_t> frame;
- edi_decoder.push_bytes(buf);
+ try {
+ ret = sock.recv(buf.data(), buf.size(), 0);
+ if (ret > 0) {
+ buf.resize(ret);
+ std::vector<uint8_t> frame;
+ edi_decoder.push_bytes(buf);
+ }
}
- } while (ret > 0);
-
- return 0;
+ catch (const Socket::TCPSocket::Interrupted&) {
+ }
+ } while (running and ret > 0);
}
- private:
-
- void add_edi_destination(void)
+ void add_edi_destination()
{
if (not dest_addr_set) {
throw std::runtime_error("Destination address not specified for destination number " +
@@ -340,7 +380,6 @@ class Main : public EdiDecoder::ETIDataCollector {
bool ttl_set = false;
bool dest_addr_set = false;
edi::configuration_t edi_conf;
- EdiDecoder::ETIDecoder edi_decoder;
int delay_ms = 500;
bool drop_late_packets = false;
uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
@@ -365,6 +404,18 @@ int main(int argc, char **argv)
int ret = 1;
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(struct sigaction));
+ sa.sa_handler = &signal_handler;
+
+ const int sigs[] = {SIGHUP, SIGQUIT, SIGINT, SIGTERM};
+ for (int sig : sigs) {
+ if (sigaction(sig, &sa, nullptr) == -1) {
+ perror("sigaction");
+ return EXIT_FAILURE;
+ }
+ }
+
try {
Main m;
ret = m.start(argc, argv);