diff --git a/src/interfaces/csp_if_zmqhub.c b/src/interfaces/csp_if_zmqhub.c index 7976d1803..36c7f2a5f 100644 --- a/src/interfaces/csp_if_zmqhub.c +++ b/src/interfaces/csp_if_zmqhub.c @@ -155,6 +155,47 @@ int csp_zmqhub_init_w_endpoints(uint16_t addr, return_interface); } +static void csp_zmqhub_driver_destroy(zmq_driver_t ** _drv, const char * const publish_endpoint, const char * const subscribe_endpoint) { + + if (*_drv == NULL) { + return; + } + + zmq_driver_t * const drv = *_drv; + + int ret; + (void)ret; /* Silence unused variable warning (promoted to an error if -Werr) issued when building with NDEBUG (release with asserts turned off) */ + + if (subscribe_endpoint != NULL) { + ret = zmq_disconnect(drv->subscriber, subscribe_endpoint); + assert(ret == 0); + } + + if (publish_endpoint != NULL) { + ret = zmq_disconnect(drv->publisher, publish_endpoint); + assert(ret == 0); + } + + if (drv->subscriber) { + ret = zmq_close(drv->subscriber); + assert(ret == 0); + } + + if (drv->publisher) { + ret = zmq_close(drv->publisher); + assert(ret == 0); + } + + if (drv->context != NULL) { + ret = zmq_ctx_destroy(drv->context); + assert(ret == 0); + } + + free(drv); + *_drv = NULL; + +} + int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr, const uint16_t rxfilter[], unsigned int rxfilter_count, const char * publish_endpoint, @@ -165,7 +206,9 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr int ret; pthread_attr_t attributes; zmq_driver_t * drv = calloc(1, sizeof(*drv)); - assert(drv != NULL); + if (drv == NULL) { + return CSP_ERR_NOMEM; + } if (ifname == NULL) { ifname = CSP_ZMQHUB_IF_NAME; @@ -178,17 +221,26 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr drv->iface.addr = addr; drv->context = zmq_ctx_new(); - assert(drv->context != NULL); + if (drv->context == NULL) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_DRIVER; + } //csp_print("INIT %s: pub(tx): [%s], sub(rx): [%s], rx filters: %u", drv->iface.name, publish_endpoint, subscribe_endpoint, rxfilter_count); /* Publisher (TX) */ drv->publisher = zmq_socket(drv->context, ZMQ_PUB); - assert(drv->publisher != NULL); + if (drv->publisher == NULL) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_DRIVER; + } /* Subscriber (RX) */ drv->subscriber = zmq_socket(drv->context, ZMQ_SUB); - assert(drv->subscriber != NULL); + if (drv->subscriber == NULL) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_DRIVER; + } // subscribe to all packets - no filter ret = zmq_setsockopt(drv->subscriber, ZMQ_SUBSCRIBE, NULL, 0); @@ -196,9 +248,15 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr /* Connect to server */ ret = zmq_connect(drv->publisher, publish_endpoint); - assert(ret == 0); - zmq_connect(drv->subscriber, subscribe_endpoint); - assert(ret == 0); + if (ret) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_TIMEDOUT; + } + ret = zmq_connect(drv->subscriber, subscribe_endpoint); + if (ret) { + csp_zmqhub_driver_destroy(&drv, publish_endpoint, NULL); + return CSP_ERR_TIMEDOUT; + } /* Start RX thread */ ret = pthread_attr_init(&attributes); @@ -206,7 +264,10 @@ int csp_zmqhub_init_w_name_endpoints_rxfilter(const char * ifname, uint16_t addr ret = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED); assert(ret == 0); ret = pthread_create(&drv->rx_thread, &attributes, csp_zmqhub_task, drv); - assert(ret == 0); + if (ret) { + csp_zmqhub_driver_destroy(&drv, publish_endpoint, subscribe_endpoint); + return CSP_ERR_DRIVER; + } /* Register interface */ csp_iflist_add(&drv->iface); @@ -229,7 +290,9 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add int ret; pthread_attr_t attributes; zmq_driver_t * drv = calloc(1, sizeof(*drv)); - assert(drv != NULL); + if (drv == NULL) { + return CSP_ERR_NOMEM; + } if (ifname == NULL) { ifname = CSP_ZMQHUB_IF_NAME; @@ -241,17 +304,26 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add drv->iface.nexthop = csp_zmqhub_tx; drv->context = zmq_ctx_new(); - assert(drv->context != NULL); + if (drv->context == NULL) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_DRIVER; + } csp_print(" ZMQ init %s: addr: %u, pub(tx): [%s], sub(rx): [%s]\n", drv->iface.name, addr, pub, sub); /* Publisher (TX) */ drv->publisher = zmq_socket(drv->context, ZMQ_PUB); - assert(drv->publisher != NULL); + if (drv->publisher == NULL) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_DRIVER; + } /* Subscriber (RX) */ drv->subscriber = zmq_socket(drv->context, ZMQ_SUB); - assert(drv->subscriber != NULL); + if (drv->subscriber == NULL) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_DRIVER; + } /* If shared secret key provided */ if (sec_key) { @@ -290,9 +362,15 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add /* Connect to server */ ret = zmq_connect(drv->publisher, pub); - assert(ret == 0); + if (ret) { + csp_zmqhub_driver_destroy(&drv, NULL, NULL); + return CSP_ERR_TIMEDOUT; + } ret = zmq_connect(drv->subscriber, sub); - assert(ret == 0); + if (ret) { + csp_zmqhub_driver_destroy(&drv, pub, NULL); + return CSP_ERR_TIMEDOUT; + } if (promisc) { @@ -326,7 +404,10 @@ int csp_zmqhub_init_filter2(const char * ifname, const char * host, uint16_t add ret = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED); assert(ret == 0); ret = pthread_create(&drv->rx_thread, &attributes, csp_zmqhub_task, drv); - assert(ret == 0); + if (ret) { + csp_zmqhub_driver_destroy(&drv, pub, sub); + return CSP_ERR_DRIVER; + } /* Register interface */ csp_iflist_add(&drv->iface);