diff --git a/netutils/paho_mqtt/CMakeLists.txt b/netutils/paho_mqtt/CMakeLists.txt new file mode 100644 index 00000000000..d3360390b55 --- /dev/null +++ b/netutils/paho_mqtt/CMakeLists.txt @@ -0,0 +1,166 @@ +# ############################################################################## +# apps/netutils/paho_mqtt/CMakeLists.txt +# +# Licensed to the Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. The ASF licenses this +# file to you under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +# +# ############################################################################## + +if(CONFIG_LIB_MQTT5) + + # ############################################################################ + # Config and Fetch Paho MQTT C library + # ############################################################################ + + set(PAHO_MQTT_DIR ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt) + + if(NOT EXISTS ${PAHO_MQTT_DIR}) + # Default version if not specified in config + if(DEFINED CONFIG_NETUTILS_PAHO_MQTT_VERSION) + set(PAHO_MQTT_VERSION ${CONFIG_NETUTILS_PAHO_MQTT_VERSION}) + else() + set(PAHO_MQTT_VERSION "1.3.15") + endif() + + set(PAHO_MQTT_URL "https://github.com/eclipse-paho/paho.mqtt.c/archive") + FetchContent_Declare( + paho_mqtt_fetch + URL ${PAHO_MQTT_URL}/v${PAHO_MQTT_VERSION}.zip SOURCE_DIR ${PAHO_MQTT_DIR} + BINARY_DIR ${CMAKE_BINARY_DIR}/apps/netutils/paho_mqtt/paho_mqtt + DOWNLOAD_NO_PROGRESS true + TIMEOUT 30) + + FetchContent_GetProperties(paho_mqtt_fetch) + + if(NOT paho_mqtt_fetch_POPULATED) + FetchContent_Populate(paho_mqtt_fetch) + + # GitHub ZIP files extract with a versioned top-level directory Move + # contents from paho.mqtt.c- to paho_mqtt if needed + file(GLOB extracted_dirs "${PAHO_MQTT_DIR}/paho.mqtt.c-*") + if(extracted_dirs) + list(GET extracted_dirs 0 extracted_dir) + # Move all contents from the versioned directory to paho_mqtt + file(GLOB extracted_contents "${extracted_dir}/*") + foreach(item ${extracted_contents}) + get_filename_component(item_name ${item} NAME) + execute_process( + COMMAND ${CMAKE_COMMAND} -E rename ${item} + ${PAHO_MQTT_DIR}/${item_name} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + endforeach() + # Remove the empty versioned directory + execute_process( + COMMAND ${CMAKE_COMMAND} -E remove_directory ${extracted_dir} + WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR}) + message("Moved contents from versioned directory to paho_mqtt") + endif() + + # Remove downloaded zip file if exists in current directory + file(GLOB zip_files "${CMAKE_CURRENT_LIST_DIR}/v${PAHO_MQTT_VERSION}.zip") + if(zip_files) + file(REMOVE ${zip_files}) + message("Removed downloaded zip file") + endif() + endif() + + # Apply paho_mqtt_01 patch if exists + if(EXISTS ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt_01.patch) + execute_process( + COMMAND + sh -c + "patch -p1 --forward --ignore-whitespace < ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt_01.patch || true" + WORKING_DIRECTORY ${PAHO_MQTT_DIR}) + message("paho_mqtt_01 patching done") + endif() + + message("paho_mqtt download done") + endif() + + configure_file(paho_mqtt/src/VersionInfo.h.in + ${CMAKE_CURRENT_BINARY_DIR}/VersionInfo.h @ONLY) + + set(MQTT5_INCDIR ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt/src) + list(APPEND MQTT5_INCDIR ${CMAKE_CURRENT_BINARY_DIR}) + + file(GLOB CSRCS paho_mqtt/src/*.c) + + if(CONFIG_UTILS_MQTT5) + list(APPEND CSRCS + ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt/src/samples/pubsub_opts.c) + endif() + + if(CONFIG_OPENSSL_MBEDTLS_WRAPPER) + list(REMOVE_ITEM CSRCS ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt/src/SHA1.c) + endif() + + list(REMOVE_ITEM CSRCS ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt/src/MQTTClient.c + ${CMAKE_CURRENT_LIST_DIR}/paho_mqtt/src/MQTTVersion.c) + + nuttx_add_library(mqtt5) + + target_sources(mqtt5 PRIVATE ${CSRCS}) + + target_include_directories(mqtt5 PRIVATE ${MQTT5_INCDIR}) + + target_compile_options(mqtt5 PRIVATE ${MQTT5_FLAGS}) + + if(CONFIG_UTILS_MQTT5) + + set(MQTT_PUB_FLAGS + -DmessageArrived=mqtt_pub_messageArrived + -DonDisconnect=mqtt_pub_onDisconnect + -DonConnectFailure5=mqtt_pub_onConnectFailure5 + -DonConnectFailure=mqtt_pub_onConnectFailure + -DonConnect5=mqtt_pub_onConnect5 + -DonConnect=mqtt_pub_onConnect + -Dmysleep=mqtt_pub_mysleep + -DtoStop=mqtt_pub_toStop + -Dopts=mqtt_pub_opts + -Dmyconnect=mqtt_pub_myconnect + -Dcfinish=mqtt_pub_cfinish + -Dtrace_callback=mqtt_pub_trace_callback) + + nuttx_add_application( + NAME + mqtt_pub + SRCS + paho_mqtt/src/samples/paho_c_pub.c + DEPENDS + mqtt5 + INCLUDE_DIRECTORIES + ${MQTT5_INCDIR} + COMPILE_FLAGS + ${MQTT_PUB_FLAGS} + STACKSIZE + ${CONFIG_UTILS_MQTT5_STACKSIZE} + PRIORITY + ${CONFIG_UTILS_MQTT5_PRIORITY}) + + nuttx_add_application( + NAME + mqtt_sub + SRCS + paho_mqtt/src/samples/paho_c_sub.c + DEPENDS + mqtt5 + INCLUDE_DIRECTORIES + ${MQTT5_INCDIR} + STACKSIZE + ${CONFIG_UTILS_MQTT5_STACKSIZE} + PRIORITY + ${CONFIG_UTILS_MQTT5_PRIORITY}) + endif() +endif() diff --git a/netutils/paho_mqtt/Kconfig b/netutils/paho_mqtt/Kconfig new file mode 100644 index 00000000000..987fccb765b --- /dev/null +++ b/netutils/paho_mqtt/Kconfig @@ -0,0 +1,37 @@ +# +# Copyright (C) 2020 Xiaomi Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +config LIB_MQTT5 + bool "Enable mqtt5" + default n + ---help--- + A library for accessing mqtt5 client services through C libraries calls in a simple manner. + +config UTILS_MQTT5 + tristate "Enable mqtt5 tool" + depends on LIB_MQTT5 + ---help--- + Enable mqtt utility + +if UTILS_MQTT5 +config UTILS_MQTT5_PRIORITY + int "mqtt utility priority" + default 100 + +config UTILS_MQTT5_STACKSIZE + int "mqtt utility statcksize" + default 16384 +endif diff --git a/netutils/paho_mqtt/Make.defs b/netutils/paho_mqtt/Make.defs new file mode 100644 index 00000000000..a9177797693 --- /dev/null +++ b/netutils/paho_mqtt/Make.defs @@ -0,0 +1,23 @@ +############################################################################ +# apps/netutils/paho_mqtt/Make.defs +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +ifneq ($(CONFIG_LIB_MQTT5),) +CONFIGURED_APPS += $(APPDIR)/netutils/paho_mqtt +endif diff --git a/netutils/paho_mqtt/Makefile b/netutils/paho_mqtt/Makefile new file mode 100644 index 00000000000..21c9d904f5f --- /dev/null +++ b/netutils/paho_mqtt/Makefile @@ -0,0 +1,111 @@ +############################################################################ +# apps/netutils/paho_mqtt/Makefile +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. The +# ASF licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +############################################################################ + +include $(APPDIR)/Make.defs + +PAHO_MQTT_URL ?= "https://github.com/eclipse-paho/paho.mqtt.c/archive" + +# Default version if not specified in config +ifdef CONFIG_NETUTILS_PAHO_MQTT_VERSION +PAHO_MQTT_VERSION := $(patsubst "%",%,$(CONFIG_NETUTILS_PAHO_MQTT_VERSION)) +else +PAHO_MQTT_VERSION := 1.3.15 +endif + +PAHO_MQTT_ZIP = v$(PAHO_MQTT_VERSION).zip +PAHO_MQTT_UNPACK = paho_mqtt + +SRCDIR = $(APPDIR)/netutils/paho_mqtt/$(PAHO_MQTT_UNPACK)/src + +ifeq ($(CONFIG_LIB_MQTT5), y) + +# Check if paho_mqtt directory exists, if not download and extract +ifeq ($(wildcard $(PAHO_MQTT_UNPACK)/src),) +$(PAHO_MQTT_ZIP): + $(Q) echo "Downloading paho.mqtt.c-$(PAHO_MQTT_VERSION)" + $(Q) curl -L -o $(PAHO_MQTT_ZIP) $(PAHO_MQTT_URL)/$(PAHO_MQTT_ZIP) + +$(PAHO_MQTT_UNPACK): $(PAHO_MQTT_ZIP) + $(Q) echo "Unpacking: $(PAHO_MQTT_ZIP) -> $(PAHO_MQTT_UNPACK)" + $(Q) unzip -q $(PAHO_MQTT_ZIP) + $(Q) mv paho.mqtt.c-$(PAHO_MQTT_VERSION) $(PAHO_MQTT_UNPACK) + $(Q) rm -f $(PAHO_MQTT_ZIP) + $(Q) if [ -f paho_mqtt_01.patch ]; then \ + echo "Applying paho_mqtt_01 patch to $(PAHO_MQTT_UNPACK)"; \ + cd $(PAHO_MQTT_UNPACK) && patch -p1 --forward --ignore-whitespace < ../paho_mqtt_01.patch || true; \ + fi + $(Q) touch $(PAHO_MQTT_UNPACK) + +context:: $(PAHO_MQTT_UNPACK) + +distclean:: + $(call DELFILE, $(PAHO_MQTT_ZIP)) + $(call DELDIR, $(PAHO_MQTT_UNPACK)) +endif + +ifeq ($(CONFIG_OPENSSL_MBEDTLS_WRAPPER), y) +SKIP = $(SRCDIR)/SHA1.c +endif + +SKIP += $(SRCDIR)/MQTTClient.c +SKIP += $(SRCDIR)/MQTTVersion.c + +CSRCS = $(filter-out $(SKIP), $(wildcard $(SRCDIR)/*.c)) + +ifeq ($(CONFIG_UTILS_MQTT5),y) +CSRCS += $(SRCDIR)/samples/pubsub_opts.c +endif + +VARS = BUILD_TIMESTAMP PROJECT_VERSION PROJECT_VERSION_MAJOR +VARS += PROJECT_VERSION_MINOR PROJECT_VERSION_PATCH + +MQTT5_VERSION = $(SRCDIR)/VersionInfo.h + +SED_COMMANDS = $(foreach var,$(VARS),-e 's/@$(var)@/$($(var))/g') + +$(MQTT5_VERSION): $(SRCDIR)/VersionInfo.h.in + sed $(SED_COMMANDS) $< > $@ + +context:: $(MQTT5_VERSION) + +distclean:: + $(call DELFILE, $(MQTT5_VERSION)) + +ifeq ($(CONFIG_UTILS_MQTT5), y) +PROGNAME = mqtt_pub mqtt_sub +MAINSRC = $(SRCDIR)/samples/paho_c_pub.c $(SRCDIR)/samples/paho_c_sub.c +PRIORITY = $(CONFIG_UTILS_MQTT5_PRIORITY) +STACKSIZE = $(CONFIG_UTILS_MQTT5_STACKSIZE) +MODULE = $(CONFIG_UTILS_MQTT5) +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -DmessageArrived=mqtt_pub_messageArrived +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -DonDisconnect=mqtt_pub_onDisconnect +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -DonConnectFailure5=mqtt_pub_onConnectFailure5 +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -DonConnectFailure=mqtt_pub_onConnectFailure +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -DonConnect5=mqtt_pub_onConnect5 +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -DonConnect=mqtt_pub_onConnect +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -Dmysleep=mqtt_pub_mysleep -DtoStop=mqtt_pub_toStop +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -Dopts=mqtt_pub_opts -Dmyconnect=mqtt_pub_myconnect +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -Dcfinish=mqtt_pub_cfinish +$(SRCDIR)/samples/paho_c_pub.c_CFLAGS += -Dtrace_callback=mqtt_pub_trace_callback +endif + +endif + +include $(APPDIR)/Application.mk diff --git a/netutils/paho_mqtt/paho_mqtt_01.patch b/netutils/paho_mqtt/paho_mqtt_01.patch new file mode 100644 index 00000000000..67a1fccdf69 --- /dev/null +++ b/netutils/paho_mqtt/paho_mqtt_01.patch @@ -0,0 +1,314 @@ +From 36c02a3935382a32a7fc19d52c9d6ee602dec8d6 Mon Sep 17 00:00:00 2001 +From: zhangshuai39 +Date: Wed, 14 May 2025 11:58:02 +0800 +Subject: [PATCH] paho_mqtt: Fix mqtt compile warning + +Signed-off-by: zhangshuai39 +--- + src/MQTTAsync.c | 2 +- + src/MQTTPacket.h | 2 ++ + src/SHA1.c | 2 +- + src/Socket.c | 16 +++++++++++++++- + src/Socket.h | 1 + + src/Thread.c | 1 + + src/Thread.h | 2 +- + src/WebSocket.c | 1 - + src/WebSocket.h | 2 +- + src/samples/paho_c_pub.c | 29 ++++++++++++++++++----------- + src/samples/paho_c_sub.c | 4 +++- + 11 files changed, 45 insertions(+), 18 deletions(-) + +diff --git a/src/MQTTAsync.c b/src/MQTTAsync.c +index c548ae3..2487d69 100644 +--- a/src/MQTTAsync.c ++++ b/src/MQTTAsync.c +@@ -989,7 +989,7 @@ exit: + } + + +-int MQTTAsync_inCallback() ++int MQTTAsync_inCallback(void) + { + thread_id_type thread_id = Paho_thread_getid(); + return thread_id == sendThread_id || thread_id == receiveThread_id; +diff --git a/src/MQTTPacket.h b/src/MQTTPacket.h +index fd384ae..c833d0c 100644 +--- a/src/MQTTPacket.h ++++ b/src/MQTTPacket.h +@@ -28,7 +28,9 @@ + #include "LinkedList.h" + #include "Clients.h" + ++#ifndef bool + typedef unsigned int bool; ++#endif + typedef void* (*pf)(int, unsigned char, char*, size_t); + + #include "MQTTProperties.h" +diff --git a/src/SHA1.c b/src/SHA1.c +index a1b2c3d..e4f5g6h 100644 +--- a/src/SHA1.c ++++ b/src/SHA1.c +@@ -58,7 +58,7 @@ + # include + # define htobe32(x) OSSwapHostToBigInt32(x) + # define be32toh(x) OSSwapBigToHostInt32(x) +-#elif defined(__FreeBSD__) || defined(__NetBSD__) ++#elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__NuttX__) + # include + #endif + #include +diff --git a/src/Socket.c b/src/Socket.c +index ff36657..0ea9009 100644 +--- a/src/Socket.c ++++ b/src/Socket.c +@@ -130,7 +130,7 @@ int Socket_error(char* aString, SOCKET sock) + } + + #if !defined(_WIN32) +-void SIGPIPE_ignore() ++void SIGPIPE_ignore(void) + { + #if defined(PAHO_IGNORE_WITH_SIGNAL) + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) +@@ -182,6 +182,7 @@ void Socket_outInitialize(void) + mod_s.saved.fds_write = NULL; + mod_s.saved.fds_read = NULL; + mod_s.saved.nfds = 0; ++ mod_s.poll_in_progress = 0; + #endif + FUNC_EXIT; + } +@@ -622,17 +623,24 @@ SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int* + } + + /* Check pending write set for writeable sockets */ ++ mod_s.poll_in_progress = 1; + rc1 = poll(mod_s.saved.fds_write, mod_s.saved.nfds, 0); ++ mod_s.poll_in_progress = 0; + if (rc1 > 0 && Socket_continueWrites(&sock, mutex) == SOCKET_ERROR) + { + *rc = SOCKET_ERROR; ++ mod_s.poll_in_progress = 0; + goto exit; + } + + /* Prevent performance issue by unlocking the socket_mutex while waiting for a ready socket. */ ++ + Paho_thread_unlock_mutex(mutex); ++ mod_s.poll_in_progress = 1; + *rc = poll(mod_s.saved.fds_read, mod_s.saved.nfds, timeout_ms); ++ mod_s.poll_in_progress = 0; + Paho_thread_lock_mutex(mutex); ++ + if (*rc == SOCKET_ERROR) + { + Socket_error("poll", 0); +@@ -1029,6 +1037,12 @@ int Socket_close(SOCKET socket) + int rc = 0; + + FUNC_ENTRY; ++ ++ /* Waiting for poll to end */ ++ while (mod_s.poll_in_progress) { ++ usleep(1000); // 1ms ++ } ++ + Paho_thread_lock_mutex(socket_mutex); + Socket_close_only(socket); + Socket_abortWrite(socket); +diff --git a/src/Socket.h b/src/Socket.h +index 6e52ab8..8afbdb6 100644 +--- a/src/Socket.h ++++ b/src/Socket.h +@@ -125,6 +125,7 @@ typedef struct + unsigned int nfds; /**< no of file descriptors for poll */ + struct pollfd* fds_read; /**< poll read file descriptors */ + struct pollfd* fds_write; ++ volatile int poll_in_progress; /**< Indicates poll operation is in progress (poll branch only) */ + + struct { + int cur_fd; /**< index into the fds_saved array */ +diff --git a/src/Thread.c b/src/Thread.c +index f4d43fb..52f810a 100644 +--- a/src/Thread.c ++++ b/src/Thread.c +@@ -72,6 +72,7 @@ void Paho_thread_start(thread_fn fn, void* parameter) + #else + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); ++ pthread_attr_setstacksize(&attr, CONFIG_UTILS_MQTT5_STACKSIZE); + if (pthread_create(&thread, &attr, fn, parameter) != 0) + thread = 0; + pthread_attr_destroy(&attr); +diff --git a/src/Thread.h b/src/Thread.h +index b0c823b..72dbc43 100644 +--- a/src/Thread.h ++++ b/src/Thread.h +@@ -76,7 +76,7 @@ LIBMQTT_API int Paho_thread_lock_mutex(mutex_type); + LIBMQTT_API int Paho_thread_unlock_mutex(mutex_type); + int Paho_thread_destroy_mutex(mutex_type); + +-LIBMQTT_API thread_id_type Paho_thread_getid(); ++LIBMQTT_API thread_id_type Paho_thread_getid(void); + + sem_type Thread_create_sem(int*); + int Thread_wait_sem(sem_type sem, int timeout); +diff --git a/src/WebSocket.c b/src/WebSocket.c +index 4338898..f941cda 100644 +--- a/src/WebSocket.c ++++ b/src/WebSocket.c +@@ -1136,7 +1136,6 @@ int WebSocket_receiveFrame(networkHandles *net, size_t *actual_len) + + if ( has_mask ) + { +- uint8_t mask[4]; + b = WebSocket_getRawSocketData(net, 4u, &len, &rcs); + if (rcs == SOCKET_ERROR) + { +diff --git a/src/WebSocket.h b/src/WebSocket.h +index 4e437d4..5343e6f 100644 +--- a/src/WebSocket.h ++++ b/src/WebSocket.h +@@ -60,7 +60,7 @@ int WebSocket_connect(networkHandles *net, int ssl, const char *uri); + /* obtain data from network socket */ + int WebSocket_getch(networkHandles *net, char* c); + char *WebSocket_getdata(networkHandles *net, size_t bytes, size_t* actual_len); +-size_t WebSocket_framePos(); ++size_t WebSocket_framePos(void); + void WebSocket_framePosSeekTo(size_t); + + /* send data out, in websocket format only if required */ +diff --git a/src/samples/paho_c_pub.c b/src/samples/paho_c_pub.c +index e891ef8..49392d8 100644 +--- a/src/samples/paho_c_pub.c ++++ b/src/samples/paho_c_pub.c +@@ -101,16 +101,12 @@ void onConnectFailure5(void* context, MQTTAsync_failureData5* response) + MQTTAsync_strerror(response->code), + MQTTReasonCode_toString(response->reasonCode)); + connected = -1; +- +- MQTTAsync client = (MQTTAsync)context; + } + + void onConnectFailure(void* context, MQTTAsync_failureData* response) + { + fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none"); + connected = -1; +- +- MQTTAsync client = (MQTTAsync)context; + } + + +@@ -140,6 +136,12 @@ void onConnect5(void* context, MQTTAsync_successData5* response) + } + } + ++ if (rc != 0) ++ { ++ fprintf(stderr, "Publish failed with code %d\n", rc); ++ toStop = 1; ++ } ++ + connected = 1; + } + +@@ -169,6 +171,12 @@ void onConnect(void* context, MQTTAsync_successData* response) + } + } + ++ if (rc != 0) ++ { ++ fprintf(stderr, "Publish failed with code %d\n", rc); ++ toStop = 1; ++ } ++ + connected = 1; + } + +@@ -219,7 +227,6 @@ void onPublish(void* context, MQTTAsync_successData* response) + + static int onSSLError(const char *str, size_t len, void *context) + { +- MQTTAsync client = (MQTTAsync)context; + return fprintf(stderr, "SSL error: %s\n", str); + } + +@@ -234,11 +241,11 @@ static unsigned int onPSKAuth(const char* hint, + int k, n; + + int rc = 0; +- struct pubsub_opts* opts = context; ++ struct pubsub_opts* local_opts = context; + + /* printf("Trying TLS-PSK auth with hint: %s\n", hint);*/ + +- if (opts->psk == NULL || opts->psk_identity == NULL) ++ if (local_opts->psk == NULL || local_opts->psk_identity == NULL) + { + /* printf("No PSK entered\n"); */ + goto exit; +@@ -246,7 +253,7 @@ static unsigned int onPSKAuth(const char* hint, + + /* psk should be array of bytes. This is a quick and dirty way to + * convert hex to bytes without input validation */ +- psk_len = (int)strlen(opts->psk) / 2; ++ psk_len = (int)strlen(local_opts->psk) / 2; + if (psk_len > max_psk_len) + { + fprintf(stderr, "PSK too long\n"); +@@ -254,11 +261,11 @@ static unsigned int onPSKAuth(const char* hint, + } + for (k=0, n=0; k < psk_len; k++, n += 2) + { +- sscanf(&opts->psk[n], "%2hhx", &psk[k]); ++ sscanf(&local_opts->psk[n], "%2hhx", &psk[k]); + } + + /* identity should be NULL terminated string */ +- strncpy(identity, opts->psk_identity, max_identity_len); ++ strncpy(identity, local_opts->psk_identity, max_identity_len); + if (identity[max_identity_len - 1] != '\0') + { + fprintf(stderr, "Identity too long\n"); +@@ -372,7 +379,6 @@ int main(int argc, char** argv) + char* url = NULL; + int url_allocated = 0; + int rc = 0; +- const char* version = NULL; + const char* program_name = "paho_c_pub"; + MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo(); + #if !defined(_WIN32) +@@ -506,6 +512,7 @@ int main(int argc, char** argv) + while (!disconnected) + mysleep(100); + ++ toStop = 0; + MQTTAsync_destroy(&client); + + if (url_allocated) +diff --git a/src/samples/paho_c_sub.c b/src/samples/paho_c_sub.c +index 85875c9..1fe18ce 100644 +--- a/src/samples/paho_c_sub.c ++++ b/src/samples/paho_c_sub.c +@@ -207,7 +207,6 @@ int main(int argc, char** argv) + MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; + int rc = 0; + char* url = NULL; +- const char* version = NULL; + const char* program_name = "paho_c_sub"; + MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo(); + #if !defined(_WIN32) +@@ -343,6 +342,9 @@ int main(int argc, char** argv) + mysleep(100); + + exit: ++ finished = 0; ++ subscribed = 0; ++ disconnected = 0; + MQTTAsync_destroy(&client); + + return EXIT_SUCCESS; +-- +2.34.1 +