From 61cef6d90dcac0258cd96246ee76b882bf55b176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xuebin=20Su=20=28=E8=8B=8F=E5=AD=A6=E6=96=8C=29?= <12034000+xuebinsu@users.noreply.github.com> Date: Tue, 19 Apr 2022 16:54:37 +0800 Subject: [PATCH 1/7] Disregard dtx when pruning IC history (#13355) This patch aims to fix the memory leak issue #10314 . Previously, the IC history table got pruned only after begining a new distributed transaction (dtx). As a result, when a transaction contains many commands, or when a session never begins any distributed transaction, the history will never get pruned. This can cause memory leak. This patch fixes this issue by reverting this part of code to the last version before dependency on distributed transaction was introduced. After that, we will only a constant-length IC history. This ensures that - The history table takes only constant amount of memory, and - The history is long enough to handle mismatched packets. --- contrib/interconnect/udp/ic_udpifc.c | 47 +++++++++++++--------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 548f71116c0..1404832c01c 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -340,13 +340,6 @@ struct ReceiveControlInfo /* Cursor history table. */ CursorICHistoryTable cursorHistoryTable; - - /* - * Last distributed transaction id when SetupUDPInterconnect is called. - * Coupled with cursorHistoryTable, it is used to handle multiple - * concurrent cursor cases. - */ - DistributedTransactionId lastDXatId; }; /* @@ -1867,7 +1860,6 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort) /* allocate a buffer for sending disorder messages */ rx_control_info.disorderBuffer = palloc0(MIN_PACKET_SIZE); - rx_control_info.lastDXatId = InvalidTransactionId; rx_control_info.lastTornIcId = 0; initCursorICHistoryTable(&rx_control_info.cursorHistoryTable); @@ -3547,29 +3539,32 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) if (Gp_role == GP_ROLE_DISPATCH) { - DistributedTransactionId distTransId = 0; - TransactionId localTransId = 0; - TransactionId subtransId = 0; - - GetAllTransactionXids(&(distTransId), - &(localTransId), - &(subtransId)); - - /* - * Prune only when we are not in the save transaction and there is a - * large number of entries in the table + /* + * Prune the history table if it is too large + * + * We only keep history of constant length so that + * - The history table takes only constant amount of memory. + * - It is long enough so that it is almost impossible to receive + * packets from an IC instance that is older than the first one + * in the history. */ - if (distTransId != rx_control_info.lastDXatId && rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE)) + if (rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE)) { - if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) - elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, sliceTable->ic_instance_id); - pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id); + uint32 prune_id = sliceTable->ic_instance_id - CURSOR_IC_TABLE_SIZE; + + /* + * Only prune if we didn't underflow -- also we want the prune id + * to be newer than the limit (hysteresis) + */ + if (prune_id < sliceTable->ic_instance_id) + { + if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) + elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, sliceTable->ic_instance_id); + pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, prune_id); + } } addCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id, gp_command_count); - - /* save the latest transaction id. */ - rx_control_info.lastDXatId = distTransId; } /* now we'll do some setup for each of our Receiving Motion Nodes. */ From 07b2e6cb220ca0881669aa9d3abbeab247ad8fd1 Mon Sep 17 00:00:00 2001 From: Soumyadeep Chakraborty Date: Fri, 10 Jun 2022 09:32:21 -0700 Subject: [PATCH 2/7] UDP interconnect: Modernize IC socket setup This modernizes and refactors the UDP IC socket setup code somewhat, drawing inspiration from pgstat_init(), where similar UDP socket setup is performed. Significant changes: * Prefer pg_getaddrinfo_all() to getaddrinfo(). * Consistently use guarded logging for address retries. * Simplify interface for sizing UDP IC socket send/receive buffers. --- contrib/interconnect/udp/ic_faultinjection.h | 22 -- contrib/interconnect/udp/ic_udpifc.c | 328 ++++++++----------- 2 files changed, 139 insertions(+), 211 deletions(-) diff --git a/contrib/interconnect/udp/ic_faultinjection.h b/contrib/interconnect/udp/ic_faultinjection.h index c3aa69a2dcc..e3477adecc0 100644 --- a/contrib/interconnect/udp/ic_faultinjection.h +++ b/contrib/interconnect/udp/ic_faultinjection.h @@ -481,25 +481,6 @@ testmode_getsockname(const char *caller_name, int socket, return getsockname(socket, address, address_len); } -/* - * testmode_getsockopt - * getsockopt function with faults injected - */ -static int -testmode_getsockopt(const char *caller_name, int socket, int level, - int option_name, void *restrict option_value, - socklen_t *restrict option_len) -{ - if (FINC_HAS_FAULT(FINC_OS_NET_INTERFACE) && - testmode_inject_fault(gp_udpic_fault_inject_percent)) - { - write_log("inject fault to getsockopt: FINC_OS_NET_INTERFACE"); - errno = EFAULT; - return -1; - } - - return getsockopt(socket, level, option_name, option_value, option_len); -} /* * testmode_setsockopt @@ -657,9 +638,6 @@ testmode_pthread_create(const char *caller_name, pthread_t *thread, #define getsockname(socket, address, address_len) \ testmode_getsockname(PG_FUNCNAME_MACRO, socket, address, address_len) -#define getsockopt(socket, level, option_name, option_value, option_len) \ - testmode_getsockopt(PG_FUNCNAME_MACRO, socket, level, option_name, option_value, option_len) - #define setsockopt(socket, level, option_name, option_value, option_len) \ testmode_setsockopt(PG_FUNCNAME_MACRO, socket, level, option_name, option_value, option_len) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 1404832c01c..fa5f678b632 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -210,6 +210,8 @@ int #define UDPIC_FLAGS_DUPLICATE (64) #define UDPIC_FLAGS_CAPACITY (128) +#define UDPIC_MIN_BUF_SIZE (128 * 1024) + /* * ConnHtabBin * @@ -734,8 +736,7 @@ static void resetRxThreadError(void); static void SendDummyPacket(void); static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort); -static void setXmitSocketOptions(int txfd); -static uint32 setSocketBufferSize(int fd, int type, int expectedSize, int leastSize); +static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type); static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily); static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates, ExecSlice *sendSlice, @@ -1572,41 +1573,33 @@ resetRxThreadError() static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily) { - int errnoSave; - int fd = -1; - const char *fun; - + struct addrinfo *addrs = NULL; + struct addrinfo *addr; + struct addrinfo hints; + int ret; + int ic_socket = PGINVALID_SOCKET; + struct sockaddr_storage ic_socket_addr; + int tries = 0; + struct sockaddr_storage listenerAddr; + socklen_t listenerAddrlen = sizeof(ic_socket_addr); + uint32 socketSendBufferSize; + uint32 socketRecvBufferSize; - /* - * At the moment, we don't know which of IPv6 or IPv4 is wanted, or even - * supported, so just ask getaddrinfo... - * - * Perhaps just avoid this and try socket with AF_INET6 and AF_INT? - * - * Most implementation of getaddrinfo are smart enough to only return a - * particular address family if that family is both enabled, and at least - * one network adapter has an IP address of that family. - */ - struct addrinfo hints; - struct addrinfo *addrs, - *rp; - int s; - struct sockaddr_storage our_addr; - socklen_t our_addr_len; - char service[32]; - - snprintf(service, 32, "%d", 0); memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ - hints.ai_protocol = 0; /* Any protocol - UDP implied for network use due to SOCK_DGRAM */ + hints.ai_protocol = 0; + hints.ai_addrlen = 0; + hints.ai_addr = NULL; + hints.ai_canonname = NULL; + hints.ai_next = NULL; + hints.ai_flags |= AI_NUMERICHOST; #ifdef USE_ASSERT_CHECKING if (gp_udpic_network_disable_ipv6) hints.ai_family = AF_INET; #endif - fun = "getaddrinfo"; if (Gp_interconnect_address_type == INTERCONNECT_ADDRESS_TYPE_UNICAST) { Assert(interconnect_address && strlen(interconnect_address) > 0); @@ -1623,134 +1616,126 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil (errmsg("getaddrinfo called with wildcard address"))); } - s = getaddrinfo(interconnect_address, service, &hints, &addrs); - if (s != 0) - elog(ERROR, "getaddrinfo says %s", gai_strerror(s)); - /* - * getaddrinfo() returns a list of address structures, one for each valid - * address and family we can use. - * - * Try each address until we successfully bind. If socket (or bind) fails, - * we (close the socket and) try the next address. This can happen if the - * system supports IPv6, but IPv6 is disabled from working, or if it - * supports IPv6 and IPv4 is disabled. + * Restrict what IP address we will listen on to just the one that was + * used to create this QE session. */ + Assert(interconnect_address && strlen(interconnect_address) > 0); + ret = pg_getaddrinfo_all(interconnect_address, NULL, &hints, &addrs); + if (ret || !addrs) + { + ereport(LOG, + (errmsg("could not resolve address for UDP IC socket %s: %s", + interconnect_address, + gai_strerror(ret)))); + goto startup_failed; + } /* - * If there is both an AF_INET6 and an AF_INET choice, we prefer the - * AF_INET6, because on UNIX it can receive either protocol, whereas - * AF_INET can only get IPv4. Otherwise we'd need to bind two sockets, - * one for each protocol. - * - * Why not just use AF_INET6 in the hints? That works perfect if we know - * this machine supports IPv6 and IPv6 is enabled, but we don't know that. + * On some platforms, pg_getaddrinfo_all() may return multiple addresses + * only one of which will actually work (eg, both IPv6 and IPv4 addresses + * when kernel will reject IPv6). Worse, the failure may occur at the + * bind() or perhaps even connect() stage. So we must loop through the + * results till we find a working combination. We will generate DEBUG + * messages, but no error, for bogus combinations. */ - -#ifndef __darwin__ -#ifdef HAVE_IPV6 - if (addrs->ai_family == AF_INET && addrs->ai_next != NULL && addrs->ai_next->ai_family == AF_INET6) + for (addr = addrs; addr != NULL; addr = addr->ai_next) { - /* - * We got both an INET and INET6 possibility, but we want to prefer - * the INET6 one if it works. Reverse the order we got from - * getaddrinfo so that we try things in our preferred order. If we got - * more possibilities (other AFs??), I don't think we care about them, - * so don't worry if the list is more that two, we just rearrange the - * first two. - */ - struct addrinfo *temp = addrs->ai_next; /* second node */ - addrs->ai_next = addrs->ai_next->ai_next; /* point old first node to - * third node if any */ - temp->ai_next = addrs; /* point second node to first */ - addrs = temp; /* start the list with the old second node */ - elog(DEBUG1, "Have both IPv6 and IPv4 choices"); - } -#endif +#ifdef HAVE_UNIX_SOCKETS + /* Ignore AF_UNIX sockets, if any are returned. */ + if (addr->ai_family == AF_UNIX) + continue; #endif - for (rp = addrs; rp != NULL; rp = rp->ai_next) - { - fun = "socket"; - - /* - * getaddrinfo gives us all the parameters for the socket() call as - * well as the parameters for the bind() call. - */ - elog(DEBUG1, "receive socket ai_family %d ai_socktype %d ai_protocol %d", rp->ai_family, rp->ai_socktype, rp->ai_protocol); - fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (fd == -1) - continue; - elog(DEBUG1, "receive socket %d ai_family %d ai_socktype %d ai_protocol %d", fd, rp->ai_family, rp->ai_socktype, rp->ai_protocol); + ereportif(++tries > 1 && gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + errmsg("trying another address for UDP interconnect socket")); - fun = "fcntl(O_NONBLOCK)"; - if (!pg_set_noblock(fd)) + ic_socket = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (ic_socket == PGINVALID_SOCKET) { - if (fd >= 0) - { - closesocket(fd); - fd = -1; - } + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errcode_for_socket_access(), + errmsg("could not create UDP interconnect socket: %m"))); continue; } - fun = "bind"; - elog(DEBUG1, "bind addrlen %d fam %d", rp->ai_addrlen, rp->ai_addr->sa_family); - if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0) + /* + * Bind the socket to a kernel assigned ephemeral port on the + * interconnect_address. + */ + if (bind(ic_socket, addr->ai_addr, addr->ai_addrlen) < 0) { - *txFamily = rp->ai_family; - break; /* Success */ + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errcode_for_socket_access(), + errmsg("could not bind UDP interconnect socket: %m"))); + closesocket(ic_socket); + ic_socket = PGINVALID_SOCKET; + continue; } - if (fd >= 0) + /* Call getsockname() to eventually obtain the assigned ephemeral port */ + if (getsockname(ic_socket, (struct sockaddr *) &listenerAddr, &listenerAddrlen) < 0) { - closesocket(fd); - fd = -1; + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errcode_for_socket_access(), + errmsg("could not get address of socket for UDP interconnect: %m"))); + closesocket(ic_socket); + ic_socket = PGINVALID_SOCKET; + continue; } - } - if (rp == NULL) - { /* No address succeeded */ - goto error; + /* If we get here, we have a working socket */ + break; } - freeaddrinfo(addrs); /* No longer needed */ - - /* - * Get our socket address (IP and Port), which we will save for others to - * connected to. - */ - MemSet(&our_addr, 0, sizeof(our_addr)); - our_addr_len = sizeof(our_addr); - - fun = "getsockname"; - if (getsockname(fd, (struct sockaddr *) &our_addr, &our_addr_len) < 0) - goto error; + if (!addr || ic_socket == PGINVALID_SOCKET) + goto startup_failed; - Assert(our_addr.ss_family == AF_INET || our_addr.ss_family == AF_INET6); + /* Memorize the socket fd, kernel assigned port and address family */ + *listenerSocketFd = ic_socket; + if (listenerAddr.ss_family == AF_INET6) + { + *listenerPort = ntohs(((struct sockaddr_in6 *) &listenerAddr)->sin6_port); + *txFamily = AF_INET6; + } + else + { + *listenerPort = ntohs(((struct sockaddr_in *) &listenerAddr)->sin_port); + *txFamily = AF_INET; + } - *listenerSocketFd = fd; + /* Set up socket non-blocking mode */ + if (!pg_set_noblock(ic_socket)) + { + ereport(LOG, + (errcode_for_socket_access(), + errmsg("could not set UDP interconnect socket to nonblocking mode: %m"))); + goto startup_failed; + } - if (our_addr.ss_family == AF_INET6) - *listenerPort = ntohs(((struct sockaddr_in6 *) &our_addr)->sin6_port); - else - *listenerPort = ntohs(((struct sockaddr_in *) &our_addr)->sin_port); + /* Set up the socket's send and receive buffer sizes. */ + socketRecvBufferSize = setUDPSocketBufferSize(ic_socket, SO_RCVBUF); + if (socketRecvBufferSize == -1) + goto startup_failed; + ic_control_info.socketRecvBufferSize = socketRecvBufferSize; - setXmitSocketOptions(fd); + socketSendBufferSize = setUDPSocketBufferSize(ic_socket, SO_SNDBUF); + if (socketSendBufferSize == -1) + goto startup_failed; + ic_control_info.socketSendBufferSize = socketSendBufferSize; + pg_freeaddrinfo_all(hints.ai_family, addrs); return; -error: - errnoSave = errno; - if (fd >= 0) - closesocket(fd); - errno = errnoSave; +startup_failed: + if (addrs) + pg_freeaddrinfo_all(hints.ai_family, addrs); + if (ic_socket != PGINVALID_SOCKET) + closesocket(ic_socket); ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect error: Could not set up udp listener socket"), - errdetail("%s: %m", fun))); - return; + errmsg("interconnect error: Could not set up udp interconnect socket: %m"))); } /* @@ -2550,81 +2535,46 @@ freeRxBuffer(RxBufferPool *p, icpkthdr *buf) } /* - * setSocketBufferSize - * Set socket buffer size. + * Set UDP IC send/receive socket buffer size. + * + * We must carefully size the UDP IC socket's send/receive buffers. If the size + * is too small, say 128K, and send queue depth and receive queue depth are + * large, then there might be a lot of dropped/reordered packets. We start + * trying from a size of 2MB (unless Gp_udp_bufsize_k is specified), and + * gradually back off to UDPIC_MIN_BUF_SIZE. For a given size setting to be + * successful, the corresponding UDP kernel buffer size params must be adequate. + * */ static uint32 -setSocketBufferSize(int fd, int type, int expectedSize, int leastSize) +setUDPSocketBufferSize(int ic_socket, int buffer_type) { - int bufSize; - int errnoSave; - socklen_t skLen = 0; - const char *fun; + int expected_size; + int curr_size; + ACCEPT_TYPE_ARG3 option_len = 0; - fun = "getsockopt"; - skLen = sizeof(bufSize); - if (getsockopt(fd, SOL_SOCKET, type, (char *) &bufSize, &skLen) < 0) - goto error; + Assert(buffer_type == SO_SNDBUF || buffer_type == SO_RCVBUF); - elog(DEBUG1, "UDP-IC: xmit default buffer size %d bytes", bufSize); + expected_size = (Gp_udp_bufsize_k ? Gp_udp_bufsize_k * 1024 : 2048 * 1024); - /* - * We'll try the expected size first, and fall back to least size if that - * doesn't work. - */ - - bufSize = expectedSize; - fun = "setsockopt"; - while (setsockopt(fd, SOL_SOCKET, type, (const char *) &bufSize, skLen) < 0) + curr_size = expected_size; + option_len = sizeof(curr_size); + while (setsockopt(ic_socket, SOL_SOCKET, buffer_type, (const char *) &curr_size, option_len) < 0) { - bufSize = bufSize >> 1; - if (bufSize < leastSize) - goto error; + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errmsg("UDP-IC: setsockopt %s failed to set buffer size = %d bytes: %m", + buffer_type == SO_SNDBUF ? "send": "receive", + curr_size))); + curr_size = curr_size >> 1; + if (curr_size < UDPIC_MIN_BUF_SIZE) + return -1; } - elog(DEBUG1, "UDP-IC: xmit use buffer size %d bytes", bufSize); - - return bufSize; - -error: - errnoSave = errno; - if (fd >= 0) - closesocket(fd); - errno = errnoSave; - ereport(ERROR, - (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect error: Could not set up udp listener socket"), - errdetail("%s: %m", fun))); - /* Make GCC not complain. */ - return 0; -} - -/* - * setXmitSocketOptions - * Set transmit socket options. - */ -static void -setXmitSocketOptions(int txfd) -{ - uint32 bufSize = 0; - - /* - * The Gp_udp_bufsize_k guc should be set carefully. - * - * If it is small, such as 128K, and send queue depth and receive queue - * depth are large, then it is possible OS can not handle all of the UDP - * packets GPDB delivered to it. OS will introduce a lot of packet losses - * and disordered packets. - * - * In order to set Gp_udp_bufsize_k to a larger value, the OS UDP buffer - * should be set to a large enough value. - * - */ - bufSize = (Gp_udp_bufsize_k != 0 ? Gp_udp_bufsize_k * 1024 : 2048 * 1024); - - ic_control_info.socketRecvBufferSize = setSocketBufferSize(txfd, SO_RCVBUF, bufSize, 128 * 1024); - ic_control_info.socketSendBufferSize = setSocketBufferSize(txfd, SO_SNDBUF, bufSize, 128 * 1024); + ereportif(gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG, DEBUG3, + (errmsg("UDP-IC: socket %s current buffer size = %d bytes", + buffer_type == SO_SNDBUF ? "send": "receive", + curr_size))); + return curr_size; } #if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING) From d3afb2695fc584753afcc471a78430bb7da68c53 Mon Sep 17 00:00:00 2001 From: Hongxu Ma Date: Sun, 8 Oct 2023 09:47:57 +0800 Subject: [PATCH 3/7] Enhance prune behavior of IC_HISTROY_TABLE for the read-only transaction (#16458) Previous PR #13355 fixed issue #10314. But it introduced a new bug: IC-UDP may hang forever in some scenarios (e.g. lots of IC instances in single one UDF), please see the discussion in #13411. Enhancing it in this PR: * for the non-read-only transaction, keep the previous logic (before PR-13355) to prevent the new bug. * for the read-only transaction, introduce gp_interconnect_cursor_ic_table_size to config the size of Cursor History Table as a workaround. --- contrib/interconnect/udp/ic_udpifc.c | 109 ++++++++++++++++++--------- src/backend/cdb/cdbvars.c | 1 + src/backend/utils/misc/guc_gp.c | 11 +++ src/include/cdb/cdbvars.h | 10 +++ src/include/utils/sync_guc_name.h | 1 + 5 files changed, 95 insertions(+), 37 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index fa5f678b632..1af4c4c9e67 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -248,17 +248,6 @@ struct ConnHashTable (a)->srcPid == (b)->srcPid && \ (a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId)) - -/* - * Cursor IC table definition. - * - * For cursor case, there may be several concurrent interconnect - * instances on QD. The table is used to track the status of the - * instances, which is quite useful for "ACK the past and NAK the future" paradigm. - * - */ -#define CURSOR_IC_TABLE_SIZE (128) - /* * CursorICHistoryEntry * @@ -291,8 +280,9 @@ struct CursorICHistoryEntry typedef struct CursorICHistoryTable CursorICHistoryTable; struct CursorICHistoryTable { + uint32 size; uint32 count; - CursorICHistoryEntry *table[CURSOR_IC_TABLE_SIZE]; + CursorICHistoryEntry **table; }; /* @@ -342,6 +332,13 @@ struct ReceiveControlInfo /* Cursor history table. */ CursorICHistoryTable cursorHistoryTable; + + /* + * Last distributed transaction id when SetupUDPInterconnect is called. + * Coupled with cursorHistoryTable, it is used to handle multiple + * concurrent cursor cases. + */ + DistributedTransactionId lastDXatId; }; /* @@ -1335,8 +1332,13 @@ estimateRTT(MotionConn *mConn , uint32_t mrtt) static void initCursorICHistoryTable(CursorICHistoryTable *t) { + MemoryContext old; t->count = 0; - memset(t->table, 0, sizeof(t->table)); + t->size = Gp_interconnect_cursor_ic_table_size; + + old = MemoryContextSwitchTo(ic_control_info.memContext); + t->table = palloc0(sizeof(struct CursorICHistoryEntry *) * t->size); + MemoryContextSwitchTo(old); } /* @@ -1348,7 +1350,7 @@ addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid) { MemoryContext old; CursorICHistoryEntry *p; - uint32 index = icId % CURSOR_IC_TABLE_SIZE; + uint32 index = icId % t->size; old = MemoryContextSwitchTo(ic_control_info.memContext); p = palloc0(sizeof(struct CursorICHistoryEntry)); @@ -1378,7 +1380,7 @@ static void updateCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint8 status) { struct CursorICHistoryEntry *p; - uint8 index = icId % CURSOR_IC_TABLE_SIZE; + uint8 index = icId % t->size; for (p = t->table[index]; p; p = p->next) { @@ -1399,7 +1401,7 @@ static CursorICHistoryEntry * getCursorIcEntry(CursorICHistoryTable *t, uint32 icId) { struct CursorICHistoryEntry *p; - uint8 index = icId % CURSOR_IC_TABLE_SIZE; + uint8 index = icId % t->size; for (p = t->table[index]; p; p = p->next) { @@ -1421,7 +1423,7 @@ pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId) { uint8 index; - for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++) + for (index = 0; index < t->size; index++) { struct CursorICHistoryEntry *p, *q; @@ -1470,7 +1472,7 @@ purgeCursorIcEntry(CursorICHistoryTable *t) { uint8 index; - for (index = 0; index < CURSOR_IC_TABLE_SIZE; index++) + for (index = 0; index < t->size; index++) { struct CursorICHistoryEntry *trash; @@ -1845,6 +1847,7 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort) /* allocate a buffer for sending disorder messages */ rx_control_info.disorderBuffer = palloc0(MIN_PACKET_SIZE); + rx_control_info.lastDXatId = InvalidTransactionId; rx_control_info.lastTornIcId = 0; initCursorICHistoryTable(&rx_control_info.cursorHistoryTable); @@ -3487,34 +3490,66 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) set_test_mode(); #endif + /* Prune the QD's history table if it is too large */ if (Gp_role == GP_ROLE_DISPATCH) { - /* - * Prune the history table if it is too large - * - * We only keep history of constant length so that - * - The history table takes only constant amount of memory. - * - It is long enough so that it is almost impossible to receive - * packets from an IC instance that is older than the first one - * in the history. - */ - if (rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE)) - { - uint32 prune_id = sliceTable->ic_instance_id - CURSOR_IC_TABLE_SIZE; + CursorICHistoryTable *ich_table = &rx_control_info.cursorHistoryTable; + DistributedTransactionId distTransId = getDistributedTransactionId(); - /* - * Only prune if we didn't underflow -- also we want the prune id - * to be newer than the limit (hysteresis) + if (ich_table->count > (2 * ich_table->size)) + { + /* + * distTransId != lastDXatId + * Means the last transaction is finished, it's ok to make a prune. */ - if (prune_id < sliceTable->ic_instance_id) + if (distTransId != rx_control_info.lastDXatId) { if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) - elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, sliceTable->ic_instance_id); - pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, prune_id); + elog(DEBUG1, "prune cursor history table (count %d), icid %d, prune_id %d", + ich_table->count, sliceTable->ic_instance_id, sliceTable->ic_instance_id); + pruneCursorIcEntry(ich_table, sliceTable->ic_instance_id); + } + /* + * distTransId == lastDXatId and they are not InvalidTransactionId(0) + * Means current (non Read-Only) transaction isn't finished, should not prune. + */ + else if (rx_control_info.lastDXatId != InvalidTransactionId) + { + ; + } + /* + * distTransId == lastDXatId and they are InvalidTransactionId(0) + * Means they are the same transaction or different Read-Only transactions. + * + * For the latter, it's hard to get a perfect timepoint to prune: prune eagerly may + * cause problems (pruned current Txn's Ic instances), but prune in low frequency + * causes memory leak. + * + * So, we choose a simple algorithm to prune it here. And if it mistakenly prune out + * the still-in-used Ic instance (with lower id), the query may hang forever. + * Then user have to set a bigger gp_interconnect_cursor_ic_table_size value and + * try the query again, it is a workaround. + * + * More backgrounds please see: https://github.com/greenplum-db/gpdb/pull/16458 + */ + else + { + if (sliceTable->ic_instance_id > ich_table->size) + { + uint32 prune_id = sliceTable->ic_instance_id - ich_table->size; + Assert(prune_id < sliceTable->ic_instance_id); + + if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) + elog(DEBUG1, "prune cursor history table (count %d), icid %d, prune_id %d", + ich_table->count, sliceTable->ic_instance_id, prune_id); + pruneCursorIcEntry(ich_table, prune_id); + } } } - addCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id, gp_command_count); + addCursorIcEntry(ich_table, sliceTable->ic_instance_id, gp_command_count); + /* save the latest transaction id */ + rx_control_info.lastDXatId = distTransId; } /* now we'll do some setup for each of our Receiving Motion Nodes. */ diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c index 7984a9d61a4..c6540a8a8d5 100644 --- a/src/backend/cdb/cdbvars.c +++ b/src/backend/cdb/cdbvars.c @@ -201,6 +201,7 @@ int Gp_interconnect_queue_depth = 4; /* max number of messages * we drop. */ int Gp_interconnect_snd_queue_depth = 2; int Gp_interconnect_mem_size = 10; +int Gp_interconnect_cursor_ic_table_size = 128; int Gp_interconnect_timer_period = 5; int Gp_interconnect_timer_checking_period = 20; int Gp_interconnect_default_rtt = 20; diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 18d33966de2..dce6ae61505 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -3782,6 +3782,17 @@ struct config_int ConfigureNamesInt_gp[] = NULL, NULL, NULL }, + { + {"gp_interconnect_cursor_ic_table_size", PGC_USERSET, GP_ARRAY_TUNING, + gettext_noop("Sets the size of Cursor History Table in the UDP interconnect"), + gettext_noop("You can try to increase it when a UDF which contains many concurrent " + "cursor queries hangs. The default value is 128.") + }, + &Gp_interconnect_cursor_ic_table_size, + 128, 128, 102400, + NULL, NULL, NULL + }, + { {"gp_interconnect_timer_period", PGC_USERSET, GP_ARRAY_TUNING, gettext_noop("Sets the timer period (in ms) for UDP interconnect"), diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h index fb94a3960b5..abbddb3cd8b 100644 --- a/src/include/cdb/cdbvars.h +++ b/src/include/cdb/cdbvars.h @@ -363,6 +363,16 @@ extern int Gp_interconnect_queue_depth; * */ extern int Gp_interconnect_snd_queue_depth; + +/* + * Cursor IC table size. + * + * For cursor case, there may be several concurrent interconnect + * instances on QD. The table is used to track the status of the + * instances, which is quite useful for "ACK the past and NAK the future" paradigm. + * + */ +extern int Gp_interconnect_cursor_ic_table_size; extern int Gp_interconnect_timer_period; extern int Gp_interconnect_timer_checking_period; extern int Gp_interconnect_default_rtt; diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index ca9f7ef45f8..6d09f49155f 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -74,6 +74,7 @@ "gp_initial_bad_row_limit", "gp_interconnect_address_type", "gp_interconnect_cache_future_packets", + "gp_interconnect_cursor_ic_table_size", "gp_interconnect_debug_retry_interval", "gp_interconnect_default_rtt", "gp_interconnect_fc_method", From 94244036187514dc9992d3442862bb9748ad730e Mon Sep 17 00:00:00 2001 From: Marbin Tan Date: Thu, 24 Aug 2023 16:53:53 -0700 Subject: [PATCH 4/7] Remove `getaddrinfo` in `SendDummyPacket()` to address malloc deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `SendDummyPacket` eventually calls `getaddrinfo` (which is a reentrant), however, `getaddrinfo` is not an async-signal-safe function. `getaddrinfo` internally calls `malloc`, which is strongly advised to not do within a signal handler as it may cause deadlocks. Cache the accepted socket information for the listener, so that it can be reused in `SendDummyPacket()`. The purpose of `SendDummyPacket` is to exit more quickly; it circumvents the polling that happens, which eventually times out after 250ms. Without `SendDummyPacket()`, there will be multiple test failures since some tests expects the backend connection to terminate almost immediately. To view all the async-signal-safe functions, please view the signal-safety(7) — Linux manual page. Reviewed-by: Soumyadeep Chakraborty Reviewed-by: Andrew Repp --- contrib/interconnect/udp/ic_udpifc.c | 117 +++++++++------------------ 1 file changed, 40 insertions(+), 77 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 1af4c4c9e67..101e4a487a3 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -705,6 +705,9 @@ typedef struct ICStatistics /* Statistics for UDP interconnect. */ static ICStatistics ic_statistics; +static struct addrinfo udp_dummy_packet_addrinfo; +static struct sockaddr udp_dummy_packet_sockaddr; + /* UDP listen fd */ int UDP_listenerFd; @@ -734,7 +737,8 @@ static void SendDummyPacket(void); static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort); static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type); -static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily); +static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, + int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr); static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates, ExecSlice *sendSlice, int *pOutgoingCount); @@ -1567,13 +1571,12 @@ resetRxThreadError() pg_atomic_write_u32(&ic_control_info.eno, 0); } - /* * setupUDPListeningSocket * Setup udp listening socket. */ static void -setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily) +setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr) { struct addrinfo *addrs = NULL; struct addrinfo *addr; @@ -1694,6 +1697,16 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil if (!addr || ic_socket == PGINVALID_SOCKET) goto startup_failed; + /* + * cache the successful addrinfo and sockaddr of the listening socket, so + * we can use this information to connect to the listening socket. + */ + if (listenerAddrinfo != NULL && listenerSockaddr != NULL ) + { + memcpy(listenerAddrinfo, addr, sizeof(udp_dummy_packet_addrinfo)); + memcpy(listenerSockaddr, addr->ai_addr, sizeof(udp_dummy_packet_sockaddr)); + } + /* Memorize the socket fd, kernel assigned port and address family */ *listenerSocketFd = ic_socket; if (listenerAddr.ss_family == AF_INET6) @@ -1839,8 +1852,9 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort) /* * setup listening socket and sending socket for Interconnect. */ - setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily); - setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily); + setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, + &udp_dummy_packet_addrinfo, &udp_dummy_packet_sockaddr); + setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL, NULL); /* Initialize receive control data. */ resetMainThreadWaiting(&rx_control_info.mainWaitingState); @@ -1948,6 +1962,9 @@ static inline void CleanupMotionUDPIFC(void) ICSenderPort = 0; ICSenderFamily = 0; + memset(&udp_dummy_packet_addrinfo, 0, sizeof(udp_dummy_packet_addrinfo)); + memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr)); + #ifdef USE_ASSERT_CHECKING /* @@ -7978,74 +7995,33 @@ WaitInterconnectQuitUDPIFC(void) static void SendDummyPacket(void) { - int sockfd = -1; - int ret; - struct addrinfo *addrs = NULL; - struct addrinfo *rp; - struct addrinfo hint; - uint16 udp_listener; - char port_str[32] = {0}; - char *dummy_pkt = "stop it"; - int counter; - + int ret; + in_port_t udp_listener_port; + char *dummy_pkt = "stop it"; + int counter; + struct sockaddr_in *addr_in = NULL; + struct sockaddr_in dest_addr; /* * Get address info from interconnect udp listener port */ - udp_listener = GetListenPortUDP(); - snprintf(port_str, sizeof(port_str), "%d", udp_listener); - - MemSet(&hint, 0, sizeof(hint)); - hint.ai_socktype = SOCK_DGRAM; - hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */ - - /* Never do name resolution */ -#ifdef AI_NUMERICSERV - hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; -#else - hint.ai_flags = AI_NUMERICHOST; -#endif - - ret = pg_getaddrinfo_all(interconnect_address, port_str, &hint, &addrs); - if (ret || !addrs) - { - elog(LOG, "send dummy packet failed, pg_getaddrinfo_all(): %m"); - goto send_error; - } - - for (rp = addrs; rp != NULL; rp = rp->ai_next) - { - /* Create socket according to pg_getaddrinfo_all() */ - sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sockfd < 0) - continue; + udp_listener_port = (Gp_listener_port >> 16) & 0x0ffff; - if (!pg_set_noblock(sockfd)) - { - if (sockfd >= 0) - { - closesocket(sockfd); - sockfd = -1; - } - continue; - } - break; - } - - if (rp == NULL) - { - elog(LOG, "send dummy packet failed, create socket failed: %m"); - goto send_error; - } + addr_in = (struct sockaddr_in *) &udp_dummy_packet_sockaddr; + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin_family = addr_in->sin_family; + dest_addr.sin_port = htons(udp_listener_port); + dest_addr.sin_addr.s_addr = addr_in->sin_addr.s_addr; /* - * Send a dummy package to the interconnect listener, try 10 times + * Send a dummy package to the interconnect listener, try 10 times. + * We don't want to close the socket at the end of this function, since + * the socket will eventually close during the motion layer cleanup. */ - counter = 0; while (counter < 10) { counter++; - ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen); + ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest_addr, sizeof(dest_addr)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) @@ -8053,7 +8029,7 @@ SendDummyPacket(void) else { elog(LOG, "send dummy packet failed, sendto failed: %m"); - goto send_error; + return; } } break; @@ -8061,21 +8037,8 @@ SendDummyPacket(void) if (counter >= 10) { - elog(LOG, "send dummy packet failed, sendto failed: %m"); - goto send_error; + elog(LOG, "send dummy packet failed, sendto failed with 10 times: %m"); } - - pg_freeaddrinfo_all(hint.ai_family, addrs); - closesocket(sockfd); - return; - -send_error: - - if (addrs) - pg_freeaddrinfo_all(hint.ai_family, addrs); - if (sockfd != -1) - closesocket(sockfd); - return; } void logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id) From 30eb75188c168ae0044f9d160a6327a2ec7ff7b2 Mon Sep 17 00:00:00 2001 From: Marbin Tan Date: Mon, 25 Sep 2023 07:53:23 -0700 Subject: [PATCH 5/7] Accommodate for AF_INET6 when doing a motion layer IPC teardown Previously on commit 70306db18e2, we removed pg_getaddrinfo_all for signal handlers. However, in doing so, the capability of supporting both AF_INET6 and AF_INET was lost; this responsibility must now be handled by us. The commit mentioned above fixed the issue for AF_INET (IPv4), but not for AF_INET6 (IPv6). This commit addresses the situation for both AF_INET and AF_INET6. Reviewed-by: Soumyadeep Chakraborty --- contrib/interconnect/udp/ic_udpifc.c | 158 +++++++++++++++++---------- 1 file changed, 100 insertions(+), 58 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 101e4a487a3..b3632987859 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -705,8 +705,8 @@ typedef struct ICStatistics /* Statistics for UDP interconnect. */ static ICStatistics ic_statistics; -static struct addrinfo udp_dummy_packet_addrinfo; -static struct sockaddr udp_dummy_packet_sockaddr; +/* Cached sockaddr of the listening udp socket */ +static struct sockaddr_storage udp_dummy_packet_sockaddr; /* UDP listen fd */ int UDP_listenerFd; @@ -735,10 +735,15 @@ static void setRxThreadError(int eno); static void resetRxThreadError(void); static void SendDummyPacket(void); +static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len); +#if defined(__darwin__) +#define s6_addr32 __u6_addr.__u6_addr32 +static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest); +#endif static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort); static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type); static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, - int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr); + int *txFamily, struct sockaddr_storage *listenerSockaddr); static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates, ExecSlice *sendSlice, int *pOutgoingCount); @@ -1576,7 +1581,7 @@ resetRxThreadError() * Setup udp listening socket. */ static void -setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr) +setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct sockaddr_storage *listenerSockaddr) { struct addrinfo *addrs = NULL; struct addrinfo *addr; @@ -1697,16 +1702,6 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil if (!addr || ic_socket == PGINVALID_SOCKET) goto startup_failed; - /* - * cache the successful addrinfo and sockaddr of the listening socket, so - * we can use this information to connect to the listening socket. - */ - if (listenerAddrinfo != NULL && listenerSockaddr != NULL ) - { - memcpy(listenerAddrinfo, addr, sizeof(udp_dummy_packet_addrinfo)); - memcpy(listenerSockaddr, addr->ai_addr, sizeof(udp_dummy_packet_sockaddr)); - } - /* Memorize the socket fd, kernel assigned port and address family */ *listenerSocketFd = ic_socket; if (listenerAddr.ss_family == AF_INET6) @@ -1720,6 +1715,13 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil *txFamily = AF_INET; } + /* + * cache the successful sockaddr of the listening socket, so + * we can use this information to connect to the listening socket. + */ + if (listenerSockaddr != NULL) + memcpy(listenerSockaddr, &listenerAddr, sizeof(struct sockaddr_storage)); + /* Set up socket non-blocking mode */ if (!pg_set_noblock(ic_socket)) { @@ -1852,9 +1854,8 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort) /* * setup listening socket and sending socket for Interconnect. */ - setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, - &udp_dummy_packet_addrinfo, &udp_dummy_packet_sockaddr); - setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL, NULL); + setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, &udp_dummy_packet_sockaddr); + setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL); /* Initialize receive control data. */ resetMainThreadWaiting(&rx_control_info.mainWaitingState); @@ -1962,7 +1963,6 @@ static inline void CleanupMotionUDPIFC(void) ICSenderPort = 0; ICSenderFamily = 0; - memset(&udp_dummy_packet_addrinfo, 0, sizeof(udp_dummy_packet_addrinfo)); memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr)); #ifdef USE_ASSERT_CHECKING @@ -3269,30 +3269,8 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS */ if (pEntry->txfd_family == AF_INET6) { - struct sockaddr_storage temp; - const struct sockaddr_in *in = (const struct sockaddr_in *) &conn->peer; - struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp; - - memset(&temp, 0, sizeof(temp)); - elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address."); - - /* Construct a V4-to-6 mapped address. */ - temp.ss_family = AF_INET6; - in6_new->sin6_family = AF_INET6; - in6_new->sin6_port = in->sin_port; - in6_new->sin6_flowinfo = 0; - - memset(&in6_new->sin6_addr, '\0', sizeof(in6_new->sin6_addr)); - /* in6_new->sin6_addr.s6_addr16[5] = 0xffff; */ - ((uint16 *) &in6_new->sin6_addr)[5] = 0xffff; - /* in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; */ - memcpy(((char *) &in6_new->sin6_addr) + 12, &(in->sin_addr), 4); - in6_new->sin6_scope_id = 0; - - /* copy it back */ - memcpy(&conn->peer, &temp, sizeof(struct sockaddr_in6)); - conn->peer_len = sizeof(struct sockaddr_in6); + ConvertToIPv4MappedAddr(&conn->peer, &conn->peer_len); } else { @@ -7989,6 +7967,59 @@ WaitInterconnectQuitUDPIFC(void) ic_control_info.threadCreated = false; } +/* + * If the socket was created AF_INET6, but the address we want to + * send to is IPv4 (AF_INET), we need to change the address + * format. On Linux, this is not necessary: glibc automatically + * handles this. But on MAC OSX and Solaris, we need to convert + * the IPv4 address to IPv4-mapped IPv6 address in AF_INET6 format. + * + * The comment above relies on getaddrinfo() via function getSockAddr to get + * the correct V4-mapped address. We need to be careful here as we need to + * ensure that the platform we are using is POSIX 1003-2001 compliant. + * Just to be on the safeside, we'll be keeping this function for + * now to be used for all platforms and not rely on POSIX. + * + * Since this can be called in a signal handler, we avoid the use of + * async-signal unsafe functions such as memset/memcpy + */ +static void +ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len) +{ + const struct sockaddr_in *in = (const struct sockaddr_in *) sockaddr; + struct sockaddr_storage temp = {0}; + struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp; + + /* Construct a IPv4-to-IPv6 mapped address. */ + temp.ss_family = AF_INET6; + in6_new->sin6_family = AF_INET6; + in6_new->sin6_port = in->sin_port; + in6_new->sin6_flowinfo = 0; + + ((uint16 *) &in6_new->sin6_addr)[5] = 0xffff; + + in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; + in6_new->sin6_scope_id = 0; + + /* copy it back */ + *sockaddr = temp; + *o_len = sizeof(struct sockaddr_in6); +} + +#if defined(__darwin__) +/* macos does not accept :: as the destination, we will need to covert this to the IPv6 loopback */ +static void +ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest) +{ + char address[INET6_ADDRSTRLEN]; + /* we want to terminate our own process, so this should be local */ + const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &udp_dummy_packet_sockaddr; + inet_ntop(AF_INET6, &in6->sin6_addr, address, sizeof(address)); + if (strcmp("::", address) == 0) + ((struct sockaddr_in6 *)dest)->sin6_addr = in6addr_loopback; +} +#endif + /* * Send a dummy packet to interconnect thread to exit poll() immediately */ @@ -7996,21 +8027,34 @@ static void SendDummyPacket(void) { int ret; - in_port_t udp_listener_port; char *dummy_pkt = "stop it"; int counter; - struct sockaddr_in *addr_in = NULL; - struct sockaddr_in dest_addr; - /* - * Get address info from interconnect udp listener port - */ - udp_listener_port = (Gp_listener_port >> 16) & 0x0ffff; + struct sockaddr_storage dest; + socklen_t dest_len; - addr_in = (struct sockaddr_in *) &udp_dummy_packet_sockaddr; - memset(&dest_addr, 0, sizeof(dest_addr)); - dest_addr.sin_family = addr_in->sin_family; - dest_addr.sin_port = htons(udp_listener_port); - dest_addr.sin_addr.s_addr = addr_in->sin_addr.s_addr; + Assert(udp_dummy_packet_sockaddr.ss_family == AF_INET || udp_dummy_packet_sockaddr.ss_family == AF_INET6); + Assert(ICSenderFamily == AF_INET || ICSenderFamily == AF_INET6); + + dest = udp_dummy_packet_sockaddr; + dest_len = (ICSenderFamily == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + + if (ICSenderFamily == AF_INET6) + { +#if defined(__darwin__) + if (udp_dummy_packet_sockaddr.ss_family == AF_INET6) + ConvertIPv6WildcardToLoopback(&dest); +#endif + if (udp_dummy_packet_sockaddr.ss_family == AF_INET) + ConvertToIPv4MappedAddr(&dest, &dest_len); + } + + if (ICSenderFamily == AF_INET && udp_dummy_packet_sockaddr.ss_family == AF_INET6) + { + /* the size of AF_INET6 is bigger than the side of IPv4, so + * converting from IPv6 to IPv4 may potentially not work. */ + ereport(LOG, errmsg("sending dummy packet failed: cannot send from AF_INET to receiving on AF_INET6")); + return; + } /* * Send a dummy package to the interconnect listener, try 10 times. @@ -8021,14 +8065,14 @@ SendDummyPacket(void) while (counter < 10) { counter++; - ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest_addr, sizeof(dest_addr)); + ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest, dest_len); if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; else { - elog(LOG, "send dummy packet failed, sendto failed: %m"); + ereport(LOG, errmsg("send dummy packet failed, sendto failed: %m")); return; } } @@ -8036,9 +8080,7 @@ SendDummyPacket(void) } if (counter >= 10) - { - elog(LOG, "send dummy packet failed, sendto failed with 10 times: %m"); - } + ereport(LOG, errmsg("send dummy packet failed, sendto failed with 10 times: %m")); } void logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id) From 1e5ffffbee99129b4d1175765831c5eae625a54f Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Wed, 27 Apr 2022 15:10:35 +0800 Subject: [PATCH 6/7] Make sendControlMessage to retry when interrupted (#13371) `sendControlMessage()` method has no retry attempts, this refactor abstracted a `sendto` system call wrapper with retry enabled. Co-authored-by: zwenlin --- contrib/interconnect/udp/ic_udpifc.c | 113 ++++++++++++++++----------- src/test/regress/init_file | 3 + 2 files changed, 72 insertions(+), 44 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index b3632987859..324aa2f074a 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -848,6 +848,8 @@ static inline void logPkt(char *prefix, icpkthdr *pkt); static void aggregateStatistics(ChunkTransportStateEntry *pChunkEntry); static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout); + +static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail); static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry, int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent); @@ -2224,9 +2226,6 @@ destroyConnHashTable(ConnHashTable *ht) /* * sendControlMessage * Helper function to send a control message. - * - * It is different from sendOnce which retries on interrupts... - * Here, we leave it to retransmit logic to handle these cases. */ static inline void sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen) @@ -2247,13 +2246,10 @@ sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerL if (gp_interconnect_full_crc) addCRC(pkt); - n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen); - - /* - * No need to handle EAGAIN here: no-space just means that we dropped the - * packet: our ordinary retransmit mechanism will handle that case - */ - + char errDetail[100]; + snprintf(errDetail, sizeof(errDetail), "Send control message: got error with seq %u", pkt->seq); + /* Retry for infinite times since we have no retransmit mechanism for control message */ + n = sendtoWithRetry(fd, (const char *) pkt, pkt->len, 0, addr, peerLen, -1, errDetail); if (n < pkt->len) write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq); } @@ -5346,34 +5342,25 @@ prepareXmit(MotionConn *mConn) } /* - * sendOnce - * Send a packet. + * sendtoWithRetry + * Retry sendto logic and send the packets. */ -static void -sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn) +static ssize_t +sendtoWithRetry(int socket, const void *message, size_t length, + int flags, const struct sockaddr *dest_addr, + socklen_t dest_len, int retry, const char *errDetail) { int32 n; - ChunkTransportStateEntryUDP *pEntry = NULL; - MotionConnUDP *conn = NULL; - - pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry); - Assert(pEntry); - - conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); - -#ifdef USE_ASSERT_CHECKING - if (testmode_inject_fault(gp_udpic_dropxmit_percent)) - { -#ifdef AMS_VERBOSE_LOGGING - write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid); -#endif - return; - } -#endif + int count = 0; xmit_retry: - n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0, - (struct sockaddr *) &conn->peer, conn->peer_len); + /* + * If given retry count is positive, retry up to the limited times. + * Otherwise, retry for unlimited times until succeed. + */ + if (retry > 0 && ++count > retry) + return n; + n = sendto(socket, message, length, flags, dest_addr, dest_len); if (n < 0) { int save_errno = errno; @@ -5381,8 +5368,15 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE if (errno == EINTR) goto xmit_retry; - if (errno == EAGAIN) /* no space ? not an error. */ - return; + /* + * EAGAIN: no space ? not an error. + * + * EFAULT: In Linux system call, it only happens when copying a socket + * address into kernel space failed, which is less likely to happen, + * but mocked heavily by our fault injection in regression tests. + */ + if (errno == EAGAIN || errno == EFAULT) + return n; /* * If Linux iptables (nf_conntrack?) drops an outgoing packet, it may @@ -5394,20 +5388,52 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE ereport(LOG, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("Interconnect error writing an outgoing packet: %m"), - errdetail("error during sendto() for Remote Connection: contentId=%d at %s", - conn->mConn.remoteContentId, conn->mConn.remoteHostAndPort))); - return; + errdetail("error during sendto() %s", errDetail))); + return n; } ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("Interconnect error writing an outgoing packet: %m"), errdetail("error during sendto() call (error:%d).\n" - "For Remote Connection: contentId=%d at %s", - save_errno, conn->mConn.remoteContentId, - conn->mConn.remoteHostAndPort))); + "%s", save_errno, errDetail))); /* not reached */ } + return n; +} + +/* + * sendOnce + * Send a packet. + */ +static void +sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn) +{ + int32 n; + ChunkTransportStateEntryUDP *pEntry = NULL; + MotionConnUDP *conn = NULL; + + pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry); + Assert(pEntry); + + conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); + +#ifdef USE_ASSERT_CHECKING + if (testmode_inject_fault(gp_udpic_dropxmit_percent)) + { +#ifdef AMS_VERBOSE_LOGGING + write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid); +#endif + return; + } +#endif + + char errDetail[100]; + snprintf(errDetail, sizeof(errDetail), "For Remote Connection: contentId=%d at %s", + conn->mConn.remoteContentId, + conn->mConn.remoteHostAndPort); + n = sendtoWithRetry(pEntry->txfd, buf->pkt, buf->pkt->len, 0, + (struct sockaddr *) &conn->peer, conn->peer_len, -1, errDetail); if (n != buf->pkt->len) { if (DEBUG1 >= log_min_messages) @@ -5419,7 +5445,6 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE logPkt("PKT DETAILS ", buf->pkt); #endif } - return; } @@ -5903,7 +5928,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged) ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("interconnect encountered a network error, please check your network"), - errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds.", + errdetail("Failed to send packet (seq %u) to %s (pid %d cid %d) after %u retries in %d seconds.", buf->pkt->seq, buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry, Gp_interconnect_transmit_timeout))); @@ -5924,7 +5949,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged) { ereport(WARNING, (errmsg("interconnect may encountered a network error, please check your network"), - errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries.", + errdetail("Failing to send packet (seq %u) to %s (pid %d cid %d) after %u retries.", buf->pkt->seq, buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry))); diff --git a/src/test/regress/init_file b/src/test/regress/init_file index 728f6d8ea42..d300d253b94 100644 --- a/src/test/regress/init_file +++ b/src/test/regress/init_file @@ -88,6 +88,9 @@ m/^WARNING: table ".*" contains rows in segment .*, which is outside the # of s # The following output is generated by \d on foreign tables, so ignore it. m/Distributed by: \(.*\)/ m/Distributed randomly/ +# The following output is an interconnect network warning, but still not error out, so ignore it. +m/WARNING: interconnect may encountered a network error, please check your network/ +m/Failing to send packet/ # directory_table test output is sensitive to the user running the tests m/^NOTICE:.*storage user mapping for .* does not exist for storage server/ From a97400c4c18956f68ad87352ae5887d961cfa00f Mon Sep 17 00:00:00 2001 From: Wenlin Zhang Date: Sun, 7 Apr 2024 09:54:06 +0800 Subject: [PATCH 7/7] Add hint message for MTU settings when IC reports ERROR "Failed to send packet" (#17164) * Add hint message for MTU settings when IC reports ERROR "Failed to send packet". --- contrib/interconnect/udp/ic_faultinjection.h | 8 +++++ contrib/interconnect/udp/ic_udpifc.c | 34 ++++++++++++++++--- .../regress/expected/icudp/icudp_full.out | 8 +++++ src/test/regress/sql/icudp/icudp_full.sql | 4 +++ 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/contrib/interconnect/udp/ic_faultinjection.h b/contrib/interconnect/udp/ic_faultinjection.h index e3477adecc0..f0602a19027 100644 --- a/contrib/interconnect/udp/ic_faultinjection.h +++ b/contrib/interconnect/udp/ic_faultinjection.h @@ -83,6 +83,7 @@ typedef enum { FINC_OS_NET_INTERFACE = 19, FINC_OS_MEM_INTERFACE = 20, FINC_OS_CREATE_THREAD = 21, + FINC_PKT_TOO_LONG = 22, /* These are used to inject network faults. */ FINC_NET_RECV_ERROR = 23, @@ -301,6 +302,13 @@ testmode_sendto(const char *caller_name, int socket, const void *buffer, errno = EFAULT; return -1; + case FINC_PKT_TOO_LONG: + if (!FINC_HAS_FAULT(fault_type) || !is_pkt) + break; + write_log("inject fault to sendto: FINC_PKT_TOO_LONG"); + errno = EMSGSIZE; + return -1; + default: break; } diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 324aa2f074a..3da9bde0af1 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -2246,10 +2246,23 @@ sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerL if (gp_interconnect_full_crc) addCRC(pkt); - char errDetail[100]; - snprintf(errDetail, sizeof(errDetail), "Send control message: got error with seq %u", pkt->seq); - /* Retry for infinite times since we have no retransmit mechanism for control message */ - n = sendtoWithRetry(fd, (const char *) pkt, pkt->len, 0, addr, peerLen, -1, errDetail); + /* retry 10 times for sending control message */ + int counter = 0; + while (counter < 10) + { + counter++; + n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen); + if (n < 0) + { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + continue; + else { + write_log("sendcontrolmessage: got errno %d", errno); + return; + } + } + break; + } if (n < pkt->len) write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq); } @@ -5392,6 +5405,19 @@ sendtoWithRetry(int socket, const void *message, size_t length, return n; } + /* + * If the OS can detect an MTU issue on the host network interfaces, we + * would get EMSGSIZE here. So, bail with a HINT about checking MTU. + */ + if (errno == EMSGSIZE) + { + ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), + errmsg("Interconnect error writing an outgoing packet: %m"), + errdetail("error during sendto() call (error:%d).\n" + "%s", save_errno, errDetail), + errhint("check if interface MTU is equal across the cluster and lower than gp_max_packet_size"))); + } + ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("Interconnect error writing an outgoing packet: %m"), errdetail("error during sendto() call (error:%d).\n" diff --git a/src/test/regress/expected/icudp/icudp_full.out b/src/test/regress/expected/icudp/icudp_full.out index 2f7a73594e8..62782df6e64 100644 --- a/src/test/regress/expected/icudp/icudp_full.out +++ b/src/test/regress/expected/icudp/icudp_full.out @@ -544,6 +544,14 @@ SELECT system_call_fault_injection_test(); (1 row) +-- inject faults for errMsgSize when packet is too long. +SET gp_udpic_fault_inject_bitmap = 4194304; +SELECT system_call_fault_injection_test(); + system_call_fault_injection_test +---------------------------------- + +(1 row) + -- disable ipv6 may increase the code coverage. SET gp_udpic_network_disable_ipv6 = 1; SELECT system_call_fault_injection_test(); diff --git a/src/test/regress/sql/icudp/icudp_full.sql b/src/test/regress/sql/icudp/icudp_full.sql index 1436a31cf55..6fd07f29aba 100644 --- a/src/test/regress/sql/icudp/icudp_full.sql +++ b/src/test/regress/sql/icudp/icudp_full.sql @@ -276,6 +276,10 @@ $$; SET gp_udpic_fault_inject_bitmap = 524288; SELECT system_call_fault_injection_test(); +-- inject faults for errMsgSize when packet is too long. +SET gp_udpic_fault_inject_bitmap = 4194304; +SELECT system_call_fault_injection_test(); + -- disable ipv6 may increase the code coverage. SET gp_udpic_network_disable_ipv6 = 1; SELECT system_call_fault_injection_test();