From 1119cf9c52c6bf00ecc5a45bc54cc86fdd575f7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 12 Jul 2025 11:05:31 +0800 Subject: [PATCH 01/24] Extending gpfdist in Cloudberry Database to Support SFTP Protocol for Data Ingestion gpfdist is a file distribution program in Cloudberry that can parallel load external data into the database. However, it has the drawback that data files must reside on the same machine as the tool. Therefore,extending it to support the SFTP protocol can address the above drawback and enable loading files from a remote server. Add the libssh2 library and specify the link. Implement remote data file reading using the libssh2 library with gpfdist. Extending gpfdist in Cloudberry Database to Support SFTP Protocol for Data Ingestion -- add LIBSSH2 macro ADD LIBSSH2 macro Label the SFTP-related code to indicate its characteristics. --- src/backend/Makefile | 1 + src/backend/utils/misc/fstream/Makefile | 4 +- src/backend/utils/misc/fstream/fstream.c | 449 +++++++++++++++++++++-- src/backend/utils/misc/fstream/gfile.c | 269 +++++++++++++- src/bin/gpfdist/Makefile | 2 +- src/bin/gpfdist/gpfdist.c | 43 ++- src/common/Makefile | 1 + src/include/cdb/cdbsreh.h | 1 + src/include/fstream/fstream.h | 9 + src/include/fstream/gfile.h | 31 ++ 10 files changed, 763 insertions(+), 47 deletions(-) diff --git a/src/backend/Makefile b/src/backend/Makefile index 44dbe7f0e15..a10fc61a79a 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -67,6 +67,7 @@ LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) # Cloudberry uses threads in the backend LIBS := $(LIBS) -lpthread +LIBS := $(LIBS) -lssh2 ifeq ($(with_systemd),yes) LIBS += -lsystemd diff --git a/src/backend/utils/misc/fstream/Makefile b/src/backend/utils/misc/fstream/Makefile index c8cea254e8c..835f31c3f0f 100644 --- a/src/backend/utils/misc/fstream/Makefile +++ b/src/backend/utils/misc/fstream/Makefile @@ -9,7 +9,9 @@ subdir = src/backend/utils/misc/fstream top_builddir = ../../../../.. include $(top_builddir)/src/Makefile.global -override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) +LDFLAGS += -lssh2 + +override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) $(CPPFLAGS) OBJS = fstream.o gfile.o diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 56a730b98ad..46ea60ad628 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -30,6 +30,8 @@ #include #endif +#include + char* format_error(char* c1, char* c2); @@ -147,6 +149,36 @@ static void glob_and_copyfree(glob_and_copy_t *pglob) } } +static void glob_and_copyfree_sftp(glob_and_copy_t *pglob) +{ + if (pglob->gl_pathc) + { + int i; + + for (i = 0; i < pglob->gl_pathc; i++) + { + gfile_free(pglob->gl_pathv[i]); + gfile_free(pglob->gl_username[i]); + gfile_free(pglob->gl_passwd[i]); + gfile_free(pglob->gl_hostaddr[i]); + gfile_free(pglob->gl_port[i]); + } + + gfile_free(pglob->gl_pathv); + gfile_free(pglob->gl_username); + gfile_free(pglob->gl_passwd); + gfile_free(pglob->gl_hostaddr); + gfile_free(pglob->gl_port); + + pglob->gl_pathc = 0; + pglob->gl_pathv = 0; + pglob->gl_username = 0; + pglob->gl_passwd = 0; + pglob->gl_hostaddr = 0; + pglob->gl_port = 0; + } +} + const char* fstream_get_error(fstream_t*fs) { @@ -315,7 +347,12 @@ int fstream_close_with_error(fstream_t* fs, char* error) if(fs->buffer) gfile_free(fs->buffer); - glob_and_copyfree(&fs->glob); + if (fs->fd.is_sftp) + { + glob_and_copyfree_sftp(&fs->glob); + } + else + glob_and_copyfree(&fs->glob); gfile_close(&fs->fd); #ifdef GPFXDIST /* @@ -453,6 +490,310 @@ static int glob_path(fstream_t *fs, const char *path) return 0; } +int get_sftp_counts(const char *sftp_request) +{ + const char *start; + const char *end; + int count1 = 0; + int count2 = 0; + + start = (char *)sftp_request; + end = (char *)sftp_request; + + while (*start || *end) + { + if (*start == '<') + count1++; + start++; + if (*end == '>') + count2++; + end++; + } + + if (count1 != count2) + return -1; + + return count1; +} + +int ParseFilePathUri(char *uri_str, sftp_info_t *info) +{ + + int protocol_len = 0; + int len = 0; + char *start, *end; + + if (strncmp(uri_str, "sftp://", 7) == 0) + { + protocol_len = 7; + } + else + { + return -2; + } + + start = (char *)uri_str + protocol_len; + end = strchr(start, ':'); + if (end == NULL) + { + + return -2; + } + else + { + len = end - start; + + char *username = NULL; + username = (char *)gfile_malloc(len + 1); + if (username == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(username, start, len); + username[len] = '\0'; + strcpy(info->username, username); + gfile_free(username); + } + start = end + 1; + end = strchr(start, '@'); + + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *passwd = NULL; + passwd = (char *)gfile_malloc(len + 1); + if (passwd == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(passwd, start, len); + passwd[len] = '\0'; + strcpy(info->password, passwd); + gfile_free(passwd); + } + + start = end + 1; + if (strncmp(start, "[", 1) == 0) + { + end = strchr(start, ']'); + if (end == NULL) + return -2; + + len = end - start; + char *hostaddr6 = NULL; + hostaddr6 = (char *)gfile_malloc(len); + if (hostaddr6 == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(hostaddr6, start + 1, len - 1); + hostaddr6[len - 1] = '\0'; + strcpy(info->hostaddr, hostaddr6); + gfile_free(hostaddr6); + + end++; + } + + else + { + end = strchr(start, ':'); + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *hostaddr4 = NULL; + hostaddr4 = (char *)gfile_malloc(len + 1); + if (hostaddr4 == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(hostaddr4, start, len); + hostaddr4[len] = '\0'; + strcpy(info->hostaddr, hostaddr4); + gfile_free(hostaddr4); + } + } + + start = end + 1; + end = strchr(start, '/'); + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *port = NULL; + port = (char *)gfile_malloc(len + 1); + if (port == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(port, start, len); + port[len] = '\0'; + strcpy(info->port, port); + gfile_free(port); + } + + start = end; + strcpy(info->fpath, start); + + return 0; +} + +static int order_by_hostaddr(const void *e1, const void *e2) +{ + struct in_addr ip1; + struct in_addr ip2; + int result = 0; + + inet_pton(AF_INET, ((struct sftp_info_t *)e1)->hostaddr, &ip1); + inet_pton(AF_INET, ((struct sftp_info_t *)e2)->hostaddr, &ip2); + + result = memcmp(&ip1, &ip2, sizeof(struct in_addr)); + + if (result > 0) + return 1; + else if (result < 0) + return -1; + else + return 0; +} + +static int fetch_sftp_paths(fstream_t *fs, const char *path) +{ + int counts = get_sftp_counts(path); + if(counts == -1) + { + gfile_printf_then_putc_newline("sftp path is Non-conforming, please check "); + return 1; + } + + if(counts == 0) + { + gfile_printf_then_putc_newline("sftp path is empty"); + return 1; + } + char **res = (char **)malloc(sizeof(char *) * counts); + + char **unames = (char **)gfile_malloc(sizeof(char *) * counts); + char **passwds = (char **)gfile_malloc(sizeof(char *) * counts); + char **hostaddrs = (char **)gfile_malloc(sizeof(char *) * counts); + char **ports = (char **)gfile_malloc(sizeof(char *) * counts); + char **fpaths = (char **)gfile_malloc(sizeof(char *) * counts); + + if (!unames || !passwds || !hostaddrs || !ports || !fpaths) + { + gfile_printf_then_putc_newline("out of memory"); + return 1; + } + + sftp_info_t sftp_info_lists[64]; + + fs->glob.gl_pathc = 0; + fs->glob.gl_username = unames; + fs->glob.gl_passwd = passwds; + fs->glob.gl_hostaddr = hostaddrs; + fs->glob.gl_port = ports; + fs->glob.gl_pathv = fpaths; + + { + const char *start; + const char *end; + start = (char *)path; + end = (char *)path; + + for (int i = 0; i < counts; i++) + { + while (*start != '<') + start++; + while (*end != '>') + end++; + int len = end - start - 1; + char *req = (char *)gfile_malloc(len + 1); + if (!req) + { + gfile_printf_then_putc_newline("out of memory"); + return 1; + } + strncpy(req, start + 1, len); + req[len] = '\0'; + res[i] = strdup(req); + gfile_free(req); + start++; + end++; + } + } + + for (int i = 0; i < counts; i++) + { + int parse_result = ParseFilePathUri(res[i], &sftp_info_lists[i]); + if(parse_result == 0) + continue; + else + { + for (int j = 0; j < counts; j++) + { + if(res[i]) + free(res[i]); + } + free(res); + } + } + + qsort(sftp_info_lists, counts, sizeof(sftp_info_t), order_by_hostaddr); + + for (int j = 0; j < counts; j++) + { + char *tmp_uname = sftp_info_lists[j].username; + char *tmp_passwd = sftp_info_lists[j].password; + char *tmp_hostaddr = sftp_info_lists[j].hostaddr; + char *tmp_port = sftp_info_lists[j].port; + char *tmp_fpath = sftp_info_lists[j].fpath; + + *unames = (char *)gfile_malloc(strlen(tmp_uname) + 1); + *passwds = (char *)gfile_malloc(strlen(tmp_passwd) + 1); + *hostaddrs = (char *)gfile_malloc(strlen(tmp_hostaddr) + 1); + *ports = (char *)gfile_malloc(strlen(tmp_port) + 1); + *fpaths = (char *)gfile_malloc(strlen(tmp_fpath) + 1); + + if (!(*unames) || !(*passwds) || !(*hostaddrs) || !(*ports) || !(*fpaths)) + { + gfile_printf_then_putc_newline("out of memory!!!"); + return 1; + } + + strcpy(*unames++, tmp_uname); + strcpy(*passwds++, tmp_passwd); + strcpy(*hostaddrs++, tmp_hostaddr); + strcpy(*ports++, tmp_port); + strcpy(*fpaths++, tmp_fpath); + + fs->glob.gl_pathc++; + } + + for (int i = 0; i < counts; i++) + { + free(res[i]); + } + free(res); + return 0; +} #ifdef GPFXDIST /* @@ -501,6 +842,7 @@ fstream_open(const char *path, const struct fstream_options *options, { int i; fstream_t* fs; + bool_t is_sftp = FALSE; *response_code = 500; *response_string = "Internal Server Error"; @@ -515,27 +857,44 @@ fstream_open(const char *path, const struct fstream_options *options, fs->options = *options; fs->buffer = gfile_malloc(options->bufsize); + if (strncmp(path, "/glob)) + else { - if (expand_directories(fs)) + if (glob_path(fs, path)) { fstream_close(fs); return 0; } + + /* + * If the list of files in our filestrem includes a directory name, expand + * the directory and add all the files inside of it. + */ + if (fpath_all_directories(&fs->glob)) + { + if (expand_directories(fs)) + { + fstream_close(fs); + return 0; + } + } } /* @@ -543,11 +902,14 @@ fstream_open(const char *path, const struct fstream_options *options, */ if (fs->glob.gl_pathc == 0) { - gfile_printf_then_putc_newline("fstream bad path: %s", path); - fstream_close(fs); - *response_code = 404; - *response_string = "No matching file(s) found"; - return 0; + if(!is_sftp) + { + gfile_printf_then_putc_newline("fstream bad path: %s", path); + fstream_close(fs); + *response_code = 404; + *response_string = "No matching file(s) found"; + return 0; + } } if (fs->glob.gl_pathc != 1 && options->forwrite) @@ -649,14 +1011,28 @@ fstream_open(const char *path, const struct fstream_options *options, struct gpfxdist_t* transform = (i == 0) ? options->transform : NULL; gfile_close(&fs->fd); - - if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), - response_code, response_string, transform)) +#ifdef LIBSSH2 + if (is_sftp) { - gfile_printf_then_putc_newline("fstream unable to open file %s", - fs->glob.gl_pathv[i]); - fstream_close(fs); - return 0; + if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[i], fs->glob.gl_username[i], fs->glob.gl_passwd[i], + fs->glob.gl_hostaddr[i], fs->glob.gl_port[i], gfile_open_flags(options->forwrite, options->usesync), response_code, response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", fs->glob.gl_pathv[i]); + fstream_close(fs); + return 0; + } + } +#endif + else + { + if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), + response_code, response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[i]); + fstream_close(fs); + return 0; + } } fs->compressed_size += gfile_get_compressed_size(&fs->fd); @@ -707,15 +1083,30 @@ static int nextFile(fstream_t*fs) if (fs->fidx < fs->glob.gl_pathc) { fs->skip_header_line = fs->options.header; - - if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, - &response_code, &response_string, transform)) +#ifdef LIBSSH2 + if (fs->fd.is_sftp) { - gfile_printf_then_putc_newline("fstream unable to open file %s", - fs->glob.gl_pathv[fs->fidx]); - fs->ferror = "unable to open file"; - return 1; + if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[fs->fidx], fs->glob.gl_username[fs->fidx], fs->glob.gl_passwd[fs->fidx], + fs->glob.gl_hostaddr[fs->fidx], fs->glob.gl_port[fs->fidx], GFILE_OPEN_FOR_READ, &response_code, &response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[fs->fidx]); + fs->ferror = "unable to open file"; + return 1; + } } +#endif + else + { + if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, + &response_code, &response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[fs->fidx]); + fs->ferror = "unable to open file"; + return 1; + } + } } return 0; diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 070ca1c649c..8ce1213d412 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -40,6 +40,14 @@ #include /* for flock */ #include +#ifdef LIBSSH2 +#include +#include +#endif + +#include +#include + #ifdef WIN32 #include #define snprintf _snprintf @@ -146,6 +154,47 @@ writewinpipe(gfile_t* fd, void* ptr, size_t size) return i; } +#ifdef LIBSSH2 +static ssize_t +sftp_read(gfile_t *fd, void *ptr, size_t size) +{ + ssize_t i = 0; + do + i = libssh2_sftp_read(fd->sftp_handle, ptr, size); + while (i < 0 && errno == EINTR); + + if (i < 0) + gfile_printf_then_putc_newline("i is %ld", i); + + if (i > 0) + fd->compressed_position += i; + return i; +} + +static int +sftp_close(gfile_t *fd) +{ + if (fd->sftp_handle) + libssh2_sftp_close(fd->sftp_handle); + if (fd->sftp_session) + libssh2_sftp_shutdown(fd->sftp_session); + if (fd->session) + { + libssh2_session_disconnect(fd->session, "Normal Shutdown"); + libssh2_session_free(fd->session); + } + +#ifdef WIN32 + closesocket(fd->sock); +#else + close(fd->sock); +#endif + fd->sock = -1; + libssh2_exit(); + return 0; +} +#endif + #ifdef HAVE_LIBBZ2 static void * bz_alloc(void *a, int b, int c) @@ -1334,23 +1383,32 @@ gfile_close(gfile_t*fd) { fd->close(fd); } - - if (fd->is_win_pipe) +#ifdef LIBSSH2 + if (fd->is_sftp) { - fd->close(fd); + sftp_close(fd); } +#endif else { - if(fd->held_pipe_lock) + if (fd->is_win_pipe) { + fd->close(fd); + } + else + { + if(fd->held_pipe_lock) + { #ifndef WIN32 - flock (fd->fd.filefd, LOCK_UN); + flock (fd->fd.filefd, LOCK_UN); #endif + } + ret = close_filefd(fd->fd.filefd); + if (ret == -1) + ret = 1; } - ret = close_filefd(fd->fd.filefd); - if (ret == -1) - ret = 1; } + } fd->read = 0; fd->close = 0; @@ -1358,6 +1416,96 @@ gfile_close(gfile_t*fd) return ret; } +#ifdef LIBSSH2 +int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port, int flags, int *response_code, const char **response_string, struct gpfxdist_t *transform) +{ + const char *s = strrchr(fpath, '.'); + + //struct stat sta; + LIBSSH2_SFTP_ATTRIBUTES sta; + memset(&sta, 0, sizeof(sta)); + memset(fd, 0, sizeof *fd); + fd->is_sftp = TRUE; + + int ans = sftp_open(fd, fpath, sftp_uname, sftp_passwd, sftp_hostaddr, sftp_port); + if (ans == 0) + { + gfile_printf_then_putc_newline("looks like a ftp handle"); + gfile_printf_then_putc_newline("path is %s", fpath); + } + else + { + gfile_printf_then_putc_newline("failed open a ftp handle, please check sftp-information: ip, port, passwd and filename"); + if (ans == -2) + { + sftp_free(fd); + } + return 1; + } + + if (flags == GFILE_OPEN_FOR_READ) + { + if (0 != libssh2_sftp_stat(fd->sftp_session, fpath, &sta)) + { + gfile_printf_then_putc_newline("libssh2 libssh2_sftp_stat failed"); + return 1; + } + } + + fd->compressed_size = sta.filesize; + + fd->read = sftp_read; + fd->close = sftp_close; + + /* + * delegate remaining setup work to an appropriate open routine + * or return an error if we can't handle the type + */ + if (s && strcasecmp(s, ".gz") == 0) + { +#ifndef HAVE_LIBZ + gfile_printf_then_putc_newline(".gz not supported"); +#else + /* + * flag used by function gfile close + */ + fd->compression = GZ_COMPRESSION; + + if (flags != GFILE_OPEN_FOR_READ) + { + fd->is_write = TRUE; + } + + return gz_file_open(fd); +#endif + } + else if (s && strcasecmp(s, ".bz2") == 0) + { +#ifndef HAVE_LIBBZ2 + gfile_printf_then_putc_newline(".bz2 not supported"); +#else + fd->compression = BZ_COMPRESSION; + if (flags != GFILE_OPEN_FOR_READ) + gfile_printf_then_putc_newline(".bz2 not yet supported for writable tables"); + + return bz_file_open(fd); +#endif + } + else if (s && strcasecmp(s, ".z") == 0) + gfile_printf_then_putc_newline("gfile compression .z file is not supported"); + else if (s && strcasecmp(s, ".zip") == 0) + gfile_printf_then_putc_newline("gfile compression zip is not supported"); + else + return 0; + + *response_code = 415; + *response_string = "Unsupported File Type"; + + return 1; +} +#endif + ssize_t gfile_read(gfile_t *fd, void *ptr, size_t len) { @@ -1407,3 +1555,108 @@ off_t gfile_get_compressed_position(gfile_t *fd) { return fd->compressed_position; } + +#ifdef LIBSSH2 +int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port) +{ + int rc; + int auth_pw = 0; + char *userauthlist; + uint16_t port; + unsigned long hostaddr; + struct sockaddr_in sin; + + hostaddr = inet_addr(sftp_hostaddr); + + sscanf(sftp_port, "%hu", &port); + + rc = libssh2_init(0); + + if (rc != 0) + { + gfile_printf_then_putc_newline("libssh2 initialization failed (%d)\n", rc); + return -1; + } + + fd->sock = -1; + + fd->sock = socket(AF_INET, SOCK_STREAM, 0); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = hostaddr; + + if (connect(fd->sock, (struct sockaddr *)(&sin), + sizeof(struct sockaddr_in)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } + + fd->session = libssh2_session_init(); + if (!fd->session) + return -1; + + /* Since we have set non-blocking, tell libssh2 we are blocking */ + libssh2_session_set_blocking(fd->session, 1); + + /* ... start it up. This will trade welcome banners, exchange keys, + * and setup crypto, compression, and MAC layers + */ + rc = libssh2_session_handshake(fd->session, fd->sock); + if (rc) + { + gfile_printf_then_putc_newline("Failure establishing SSH session: %d\n", rc); + return -1; + } + + /* check what authentication methods are available */ + userauthlist = libssh2_userauth_list(fd->session, sftp_uname, strlen(sftp_uname)); + if (strstr(userauthlist, "password") != NULL) + { + auth_pw |= 1; + } + + if (auth_pw & 1) + { + if (libssh2_userauth_password(fd->session, sftp_uname, sftp_passwd)) + { + gfile_printf_then_putc_newline("Authentication by password failed.\n"); + return -2; + } + } + + gfile_printf_then_putc_newline("libssh2_sftp_init()!\n"); + fd->sftp_session = libssh2_sftp_init(fd->session); + + if (!(fd->sftp_session)) + { + gfile_printf_then_putc_newline("Unable to init SFTP session\n"); + return -2; + } + + fd->sftp_handle = + libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + if (!(fd->sftp_handle)) + { + gfile_printf_then_putc_newline("Unable to open file with SFTP: %ld\n", + libssh2_sftp_last_error(fd->sftp_session)); + return -2; + } + + return 0; +} +void sftp_free(gfile_t *fd) +{ + libssh2_session_disconnect(fd->session, "Normal Shutdown"); + libssh2_session_free(fd->session); + +#ifdef WIN32 + closesocket(fd->sock); +#else + close(fd->sock); +#endif + fd->sock = -1; + libssh2_exit(); +} +#endif \ No newline at end of file diff --git a/src/bin/gpfdist/Makefile b/src/bin/gpfdist/Makefile index 3739e3835c5..cb84109b309 100644 --- a/src/bin/gpfdist/Makefile +++ b/src/bin/gpfdist/Makefile @@ -25,7 +25,7 @@ ifeq ($(PORTNAME),win32) OBJS += $(top_builddir)/src/port/glob.o endif -LDLIBS += $(LIBS) $(GPFDIST_LIBS) $(apr_link_ld_libs) +LDLIBS += $(LIBS) $(GPFDIST_LIBS) $(apr_link_ld_libs) -lssh2 all: gpfdist$(X) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index 78fc2abc26b..96bbd05e12d 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -1273,6 +1273,10 @@ static void request_end(request_t* r, int error, const char* errmsg, int sendhea static int local_send(request_t *r, const char* buf, int buflen) { int n = gpfdist_send(r, buf, buflen); + + int is_sftp_type = 0; + if(strncmp(r->path, "/session; retblock->bot = retblock->top = 0; @@ -1662,20 +1666,36 @@ static void sessions_cleanup(void) static int session_attach(request_t* r) { char key[1024]; + char tmp_key[1024]; session_t* session = NULL; - + int is_sftp_mode = 0; /* * create the session key (tid:path) */ - if (sizeof(key) - 1 == apr_snprintf(key, sizeof(key), "%s:%s", - r->tid, r->path)) + if (strncmp(r->path, "/tid) + strlen(r->path) >= 1024) + { + gwarning(NULL, "sftp_path is too long"); + request_end(r, 1, 0, 0); + return -1; + } } + else + { + if (sizeof(tmp_key) - 1 == apr_snprintf(tmp_key, sizeof(tmp_key), "%s:%s", + r->tid, r->path)) + { + http_error(r, FDIST_BAD_REQUEST, "path too long"); + request_end(r, 1, 0, 0); + return -1; + } + } + apr_snprintf(key, sizeof(key), "%s:%s", r->tid, r->path); /* check if such session already exists in hashtable */ session = apr_hash_get(gcb.session.tab, key, APR_HASH_KEY_STRING); @@ -1769,7 +1789,8 @@ static int session_attach(request_t* r) fstream_options.transform->errfile = r->trans.errfile; fstream_options.transform->stderr_server = r->trans.stderr_server; } - gprintlnif(r, "r->path %s", r->path); + if (is_sftp_mode == 0) + gprintlnif(r, "r->path %s", r->path); #endif /* try opening the fstream */ @@ -2069,6 +2090,7 @@ static void do_read_request(int fd, short event, void* arg) char* p = NULL; char* pp = NULL; char* path = NULL; + bool is_sftp_type = false; /* If we timeout, close the request. */ if (event & EV_TIMEOUT) @@ -2208,6 +2230,11 @@ static void do_read_request(int fd, short event, void* arg) /* we forced in a filename with the hidden -f option. use it */ r->path = opt.f; } + else if (!strncmp(path, "/path = apr_psprintf(r->pool, "%s", path); + } else { if(request_set_path(r, opt.d, p, pp, path) != 0) diff --git a/src/common/Makefile b/src/common/Makefile index 4549e6a24fb..469190571a8 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -42,6 +42,7 @@ override CPPFLAGS += -DVAL_LIBS="\"$(LIBS)\"" override CPPFLAGS := -DFRONTEND -I. -I$(top_srcdir)/src/common $(CPPFLAGS) LIBS += $(PTHREAD_LIBS) +LIBS += -lssh2 # If you add objects here, see also src/tools/msvc/Mkvcbuild.pm diff --git a/src/include/cdb/cdbsreh.h b/src/include/cdb/cdbsreh.h index fdb29716d3f..97ece750a1e 100644 --- a/src/include/cdb/cdbsreh.h +++ b/src/include/cdb/cdbsreh.h @@ -100,5 +100,6 @@ extern Datum gp_truncate_error_log(PG_FUNCTION_ARGS); extern Datum gp_read_persistent_error_log(PG_FUNCTION_ARGS); extern Datum gp_truncate_persistent_error_log(PG_FUNCTION_ARGS); +extern int get_sftpfile_numbers(const char *path); #endif /* CDBSREH_H */ diff --git a/src/include/fstream/fstream.h b/src/include/fstream/fstream.h index 2c848490b88..e490868e4bc 100644 --- a/src/include/fstream/fstream.h +++ b/src/include/fstream/fstream.h @@ -23,6 +23,12 @@ typedef struct { int gl_pathc; char** gl_pathv; + char** gl_username; + char** gl_passwd; + char** gl_keyfile1; + char** gl_keyfile2; + char** gl_hostaddr; + char** gl_port; } glob_and_copy_t; struct fstream_options{ @@ -85,4 +91,7 @@ int fstream_close_with_error(fstream_t* fs, char* msg); void fstream_close(fstream_t* fs); bool_t fstream_is_win_pipe(fstream_t *fs); +int ParseFilePathUri(char *uri_str, sftp_info_t *info); +int get_sftp_counts(const char *sftp_request); + #endif diff --git a/src/include/fstream/gfile.h b/src/include/fstream/gfile.h index 565a7d910c9..16954c9dece 100644 --- a/src/include/fstream/gfile.h +++ b/src/include/fstream/gfile.h @@ -12,6 +12,11 @@ #include #endif +#ifdef LIBSSH2 +#include +#include +#endif + #ifdef WIN32 #include #endif @@ -57,6 +62,7 @@ typedef struct gfile_t off_t compressed_size,compressed_position; bool_t is_win_pipe; bool_t held_pipe_lock; /* Whether held flock on pipe file, used to restrict only one reader of pipe */ + bool_t is_sftp; union { @@ -66,6 +72,11 @@ typedef struct gfile_t #endif } fd; + LIBSSH2_SESSION *session; + LIBSSH2_SFTP *sftp_session; + LIBSSH2_SFTP_HANDLE *sftp_handle; + int sock; + union { int txt; @@ -85,6 +96,19 @@ typedef struct gfile_t struct gpfxdist_t* transform; } gfile_t; +/* Struct of sftp info */ +typedef struct sftp_info_t sftp_info_t; +struct sftp_info_t +{ + char username[32]; + char password[64]; + char keyfile1[64]; + char keyfile2[64]; + char hostaddr[64]; + char port[16]; + char fpath[256]; +}; + /* * MPP-13817 (support opening files without O_SYNC) */ @@ -103,4 +127,11 @@ void gfile_printf_then_putc_newline(const char*format,...) pg_attribute_printf(1 void*gfile_malloc(size_t size); void gfile_free(void*a); +#ifdef LIBSSH2 +int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port, int flags, int *response_code, const char **response_string, struct gpfxdist_t *transform); +extern int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port); +extern void sftp_free(gfile_t *fd); +#endif #endif From 23f372b0cb043db53dc12f04ac36c70d49e3e8c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Fri, 18 Jul 2025 15:47:24 +0800 Subject: [PATCH 02/24] Feature: Add SFTP support to gpfdist for data ingestion gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: when configure --enable-gpfdist, can dynamic check libssh2, if exist then add LIBSSH2 macro. --- configure | 91 ++++++++++++++++++++++++ configure.ac | 14 ++++ src/backend/utils/misc/fstream/fstream.c | 5 +- 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/configure b/configure index 2905dd719fa..6af3f67d5bd 100755 --- a/configure +++ b/configure @@ -814,6 +814,8 @@ enable_faultinjector enable_debug_extensions enable_pxf enable_gpfdist +LIBSSH2_LIBS +LIBSSH2_CFLAGS enable_strong_random enable_rpath default_port @@ -975,6 +977,8 @@ XML2_CFLAGS XML2_LIBS LZ4_CFLAGS LZ4_LIBS +LIBSSH2_LIBS +LIBSSH2_CFLAGS LDFLAGS_EX LDFLAGS_SL PERL @@ -1734,6 +1738,10 @@ Some influential environment variables: XML2_LIBS linker flags for XML2, overriding pkg-config LZ4_CFLAGS C compiler flags for LZ4, overriding pkg-config LZ4_LIBS linker flags for LZ4, overriding pkg-config + LIBSSH2_CFLAGS + C compiler flags for LIBSSH2, overriding pkg-config + LIBSSH2_LIBS + linker flags for LIBSSH2, overriding pkg-config LDFLAGS_EX extra linker flags for linking executables only LDFLAGS_SL extra linker flags for linking shared libraries only PERL Perl program @@ -15154,6 +15162,89 @@ fi LIBS="$_LIBS" fi +# +# LIBSSH2 +# +# +if test "$enable_gpfdist" = yes; then + # Check libssh2 >= 1.0.0 + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libssh2 >= 1.0.0" >&5 +$as_echo_n "checking for libssh2 >= 1.0.0... " >&6; } + +if test -n "$LIBSSH2_CFLAGS"; then + pkg_cv_LIBSSH2_CFLAGS="$LIBSSH2_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libssh2 >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libssh2 >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBSSH2_CFLAGS=`$PKG_CONFIG --cflags "libssh2 >= 1.1.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$LIBSSH2_LIBS"; then + pkg_cv_LIBSSH2_LIBS="$LIBSSH2_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libssh2 >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libssh2 >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBSSH2_LIBS=`$PKG_CONFIG --libs "libssh2 >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + LIBSSH2_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libssh2 >= 1.0.0" 2>&1` + else + LIBSSH2_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libssh2 >= 1.0.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$LIBSSH2_PKG_ERRORS" >&5 + + as_fn_error $? "libssh2 >= 1.0.0 is required for gpfdist support" "$LINENO" 5 + +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + as_fn_error $? "libssh2 >= 1.0.0 is required for gpfdist support" "$LINENO" 5 + +else + LIBSSH2_CFLAGS=$pkg_cv_LIBSSH2_CFLAGS + LIBSSH2_LIBS=$pkg_cv_LIBSSH2_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +$as_echo "#define LIBSSH2 1" >>confdefs.h + +fi + + # # SSL Library # diff --git a/configure.ac b/configure.ac index 4dfcd8bf8c9..143284f6009 100644 --- a/configure.ac +++ b/configure.ac @@ -225,6 +225,15 @@ PGAC_ARG_BOOL(enable, gpfdist, yes, [do not use gpfdist]) AC_SUBST(enable_gpfdist) +if test "$enable_gpfdist" = yes; then + # Check libssh2 >= 1.0.0 + PKG_CHECK_MODULES([LIBSSH2], [libssh2 >= 1.0.0], + [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], + [AC_MSG_ERROR([libssh2 >= 1.0.0 is required for gpfdist support])] + ) + AC_MSG_RESULT([checking whether to build with gpfdist support ... yes]) +fi + # # pxf # @@ -1652,6 +1661,10 @@ if test "$enable_gpfdist" = yes ; then EVENT_LIBS=" -levent" AC_SUBST(EVENT_LIBS) + AC_SEARCH_LIBS(libssh2_init, [libssh2], [have_libssh2=yes; LIBSSH2_LIBS=" -lssh2"], [AC_MSG_ERROR([libssh2 is required for gpfdist])]) + AC_SUBST(LIBSSH2_LIBS) + AC_SUBST(have_libssh2) + AC_SEARCH_LIBS(yaml_parser_initialize, [yaml], [have_yaml=yes; YAML_LIBS=" -lyaml"], [AC_MSG_WARN([libyaml is not found. disabling transformations for gpfdist.])]) AC_SUBST(YAML_LIBS) AC_SUBST(have_yaml) @@ -1950,6 +1963,7 @@ if test "$enable_gpfdist" = yes; then AC_CHECK_HEADERS(yaml.h, [], [AC_MSG_WARN([header file is not found. disabling transformations for gpfdist.])]) AC_CHECK_HEADERS(event.h, [], [AC_MSG_ERROR([header file is required for gpfdist])]) + AC_CHECK_HEADERS(libssh2, [], [AC_MSG_ERROR([header file is required for gpfdist])]) ac_save_CPPFLAGS=$CPPFLAGS CPPFLAGS="$apr_includes $CPPFLAGS" diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 46ea60ad628..6a0f002a91a 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -1095,7 +1095,7 @@ static int nextFile(fstream_t*fs) return 1; } } -#endif +#else else { if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, @@ -1106,7 +1106,8 @@ static int nextFile(fstream_t*fs) fs->ferror = "unable to open file"; return 1; } - } + } +#endif } return 0; From 536a830690466911a20f4087543ab4e398e16ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 19 Jul 2025 16:13:31 +0800 Subject: [PATCH 03/24] Feature: Add SFTP support to gpfdist for data ingestion gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: Adding the `LIBSSH2` preprocessor macro to conditionally compile the new SFTP-related code.when configure --enable-gpfdist, we can dynamic check libssh2, if exist then add LIBSSH2 macro. else will print warning information. --- configure.ac | 6 +++--- src/backend/utils/misc/fstream/fstream.c | 10 +++++----- src/backend/utils/misc/fstream/gfile.c | 5 +++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/configure.ac b/configure.ac index 143284f6009..3dd3779162b 100644 --- a/configure.ac +++ b/configure.ac @@ -228,9 +228,9 @@ AC_SUBST(enable_gpfdist) if test "$enable_gpfdist" = yes; then # Check libssh2 >= 1.0.0 PKG_CHECK_MODULES([LIBSSH2], [libssh2 >= 1.0.0], - [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], - [AC_MSG_ERROR([libssh2 >= 1.0.0 is required for gpfdist support])] - ) + [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], + [AC_MSG_WARN([libssh2 >= 1.0.0 not found, gpfdist will build without libssh2 support])] +) AC_MSG_RESULT([checking whether to build with gpfdist support ... yes]) fi diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 6a0f002a91a..cbb47dedef1 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -1011,9 +1011,10 @@ fstream_open(const char *path, const struct fstream_options *options, struct gpfxdist_t* transform = (i == 0) ? options->transform : NULL; gfile_close(&fs->fd); -#ifdef LIBSSH2 + if (is_sftp) { +#ifdef LIBSSH2 if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[i], fs->glob.gl_username[i], fs->glob.gl_passwd[i], fs->glob.gl_hostaddr[i], fs->glob.gl_port[i], gfile_open_flags(options->forwrite, options->usesync), response_code, response_string, transform)) { @@ -1021,8 +1022,8 @@ fstream_open(const char *path, const struct fstream_options *options, fstream_close(fs); return 0; } - } #endif + } else { if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), @@ -1083,9 +1084,9 @@ static int nextFile(fstream_t*fs) if (fs->fidx < fs->glob.gl_pathc) { fs->skip_header_line = fs->options.header; -#ifdef LIBSSH2 if (fs->fd.is_sftp) { +#ifdef LIBSSH2 if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[fs->fidx], fs->glob.gl_username[fs->fidx], fs->glob.gl_passwd[fs->fidx], fs->glob.gl_hostaddr[fs->fidx], fs->glob.gl_port[fs->fidx], GFILE_OPEN_FOR_READ, &response_code, &response_string, transform)) { @@ -1094,8 +1095,8 @@ static int nextFile(fstream_t*fs) fs->ferror = "unable to open file"; return 1; } +#endif } -#else else { if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, @@ -1107,7 +1108,6 @@ static int nextFile(fstream_t*fs) return 1; } } -#endif } return 0; diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 8ce1213d412..0925a7c07e2 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -1383,12 +1383,13 @@ gfile_close(gfile_t*fd) { fd->close(fd); } -#ifdef LIBSSH2 if (fd->is_sftp) { +#ifdef LIBSSH2 sftp_close(fd); - } #endif + } + else { if (fd->is_win_pipe) From 080908d02666df9989e1a5df5c9d2d88b73df472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 21 Jul 2025 19:34:59 +0800 Subject: [PATCH 04/24] Feature: Add SFTP support to gpfdist for data gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: Fix the compilation errors related to the libssh2 library option in the configure file. --- configure | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/configure b/configure index 6af3f67d5bd..120cb401d51 100755 --- a/configure +++ b/configure @@ -15240,11 +15240,10 @@ else { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 $as_echo "yes" >&6; } +fi $as_echo "#define LIBSSH2 1" >>confdefs.h - fi - # # SSL Library # From ecf10ac276978780fb60dc6d272990108f33d9de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 26 Jul 2025 11:07:46 +0800 Subject: [PATCH 05/24] Add SFTP support to gpfdist for data loading gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: Fix the compilation errors related to the libssh2 library option in the configure file. --- src/include/fstream/gfile.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/include/fstream/gfile.h b/src/include/fstream/gfile.h index 16954c9dece..0bc01f644e9 100644 --- a/src/include/fstream/gfile.h +++ b/src/include/fstream/gfile.h @@ -72,10 +72,12 @@ typedef struct gfile_t #endif } fd; +#ifdef LIBSSH2 LIBSSH2_SESSION *session; LIBSSH2_SFTP *sftp_session; LIBSSH2_SFTP_HANDLE *sftp_handle; int sock; +#endif union { From 3193c63aa876ea78111a3d8b8eb78c55a32dc5e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 26 Jul 2025 10:17:08 +0800 Subject: [PATCH 06/24] Add SFTP support to gpfdist for data ingestion Feature:Supporting the Loading of bz Format Files gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the loading of .bz2 files by utilizing the read functions provided by the libssh2 library. Add SFTP support to gpfdist for data ingestion Feature:Supporting the Loading of bz(bz2) Format Files gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the loading of .bz2 files by utilizing the read functions provided by the libssh2 library. --- src/backend/utils/misc/fstream/gfile.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 0925a7c07e2..f8336a2b7d4 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -241,8 +241,17 @@ bz_file_read(gfile_t *fd, void *ptr, size_t len) while (z->in_size < sizeof z->in) { - s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - - z->in_size); + if (fd->is_sftp) + { +#ifdef LIBSSH2 + gfile_printf_then_putc_newline("sftp_read : Read bz files from an SFTP server"); + s = sftp_read(fd, z->in + z->in_size, sizeof z->in - z->in_size); +#endif + } + + else + s = read_and_retry(fd, z->in + z->in_size, sizeof z->in + - z->in_size); if (s == 0) break; if (s < 0) From 81649bddc428ff3b1c527af2a314ba57a015bf5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 10:38:13 +0800 Subject: [PATCH 07/24] Add SFTP support to gpfdist for data ingestion Feature:Supporting the Loading of gz Format Files gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the loading of gz files by utilizing the read functions provided by the libssh2 library. --- src/backend/utils/misc/fstream/gfile.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index f8336a2b7d4..9ca0931a37d 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -364,7 +364,15 @@ gz_file_read(gfile_t* fd, void* ptr, size_t len) */ while (z->in_size < sizeof z->in) { - s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - z->in_size); + if (fd->is_sftp) + { +#ifdef LIBSSH2 + gfile_printf_then_putc_newline("sftp_read : Read gz files from an SFTP server"); + s = sftp_read(fd, z->in + z->in_size, sizeof z->in - z->in_size); +#endif + } + else + s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - z->in_size); if (s == 0) { From 18f20c9e747f77b118d12f3e78f36ce105384273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 11:20:13 +0800 Subject: [PATCH 08/24] Add SFTP support to gpfdist for data ingestion Feature:Support for SFTP server data access with IPv6 addresses. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Support for SFTP server data access based on address type, including both IPv4 and IPv6 addresses. --- src/backend/utils/misc/fstream/gfile.c | 51 +++++++++++++++++++++----- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 9ca0931a37d..8c1fd2b3a25 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -1584,8 +1584,23 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char uint16_t port; unsigned long hostaddr; struct sockaddr_in sin; + struct sockaddr_in6 sin6; + int is_ipv6 = 0; - hostaddr = inet_addr(sftp_hostaddr); + if (strchr(sftp_hostaddr, ':')) + { + is_ipv6 = 1; + } + if (!is_ipv6) + { + hostaddr = inet_addr(sftp_hostaddr); + } + + else + { + bzero(&sin6, sizeof(sin6)); + inet_pton(AF_INET6, sftp_hostaddr, &sin6.sin6_addr); + } sscanf(sftp_port, "%hu", &port); @@ -1599,18 +1614,34 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char fd->sock = -1; - fd->sock = socket(AF_INET, SOCK_STREAM, 0); - sin.sin_family = AF_INET; - sin.sin_port = htons(port); - sin.sin_addr.s_addr = hostaddr; - - if (connect(fd->sock, (struct sockaddr *)(&sin), - sizeof(struct sockaddr_in)) != 0) + if (!is_ipv6) { - gfile_printf_then_putc_newline("failed to connect!\n"); - return -1; + fd->sock = socket(AF_INET, SOCK_STREAM, 0); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = hostaddr; + + if (connect(fd->sock, (struct sockaddr *)(&sin), + sizeof(struct sockaddr_in)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } } + else + { + fd->sock = socket(AF_INET6, SOCK_STREAM, 0); + sin6.sin6_family = AF_INET6; + sin6.sin6_port = htons(port); + if (connect(fd->sock, (struct sockaddr *)(&sin6), + sizeof(struct sockaddr_in6)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } + } + fd->session = libssh2_session_init(); if (!fd->session) return -1; From d7c262f0dd4720fc439b7dd7a7f4e63766920c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 19:49:30 +0800 Subject: [PATCH 09/24] Add SFTP support to gpfdist for data ingestion Feature:Support for writing CloudBerry table data to a remote SFTP server to achieve backup functionality. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the `sftp_write` function to write CloudBerry table data to a remote SFTP server, thereby achieving backup functionality. --- src/backend/utils/misc/fstream/gfile.c | 49 +++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 8c1fd2b3a25..f347105da76 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -171,6 +171,19 @@ sftp_read(gfile_t *fd, void *ptr, size_t size) return i; } +static ssize_t +sftp_write(gfile_t *fd, void *ptr, size_t size) +{ + ssize_t i = 0; + do + i = libssh2_sftp_write(fd->sftp_handle, ptr, size); + while (i < 0 && errno == EINTR); + + if (i > 0) + fd->compressed_position += i; + return i; +} + static int sftp_close(gfile_t *fd) { @@ -452,7 +465,19 @@ gz_file_write_one_chunk(gfile_t *fd, int do_flush) } have = COMPRESSION_BUFFER_SIZE - z->s.avail_out; - if ( write_and_retry(fd, z->out, have) != have ) + if (fd->is_sftp) + { +#ifdef LIBSSH2 + if (sftp_write(fd, z->out, have) != have) + { + gfile_printf_then_putc_newline("failed to sftp write, the stream ends"); + (void)deflateEnd(&(z->s)); + ret = -1; + break; + } +#endif + } + else if ( write_and_retry(fd, z->out, have) != have ) { /* * presently gfile_close calls gz_file_close only for the on_write case so we don't need @@ -1446,6 +1471,11 @@ int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, cons memset(fd, 0, sizeof *fd); fd->is_sftp = TRUE; + if (flags != GFILE_OPEN_FOR_READ) + { + fd->is_write = TRUE; + } + int ans = sftp_open(fd, fpath, sftp_uname, sftp_passwd, sftp_hostaddr, sftp_port); if (ans == 0) { @@ -1474,6 +1504,7 @@ int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, cons fd->compressed_size = sta.filesize; fd->read = sftp_read; + fd->write = sftp_write; fd->close = sftp_close; /* @@ -1641,7 +1672,7 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char return -1; } } - + fd->session = libssh2_session_init(); if (!fd->session) return -1; @@ -1684,8 +1715,18 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char return -2; } - fd->sftp_handle = - libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + if (fd->is_write) + { + fd->sftp_handle = libssh2_sftp_open(fd->sftp_session, fpath, + LIBSSH2_FXF_WRITE | LIBSSH2_FXF_CREAT | LIBSSH2_FXF_TRUNC, + LIBSSH2_SFTP_S_IRUSR | LIBSSH2_SFTP_S_IWUSR | + LIBSSH2_SFTP_S_IRGRP | LIBSSH2_SFTP_S_IROTH); + } + else + { + fd->sftp_handle = libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + } + if (!(fd->sftp_handle)) { gfile_printf_then_putc_newline("Unable to open file with SFTP: %ld\n", From 7aff224057676fb32178c2891979ad8b053cb545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 19:59:56 +0800 Subject: [PATCH 10/24] Add SFTP support to gpfdist for data ingestion Implement the log rotation feature for gpfdist. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit addresses the issue of gpfdist logs continuously growing and occupying a large amount of disk space in a persistent working scenario. To avoid uncontrolled growth, the log rotation feature is implemented. The characteristics are as follows: 1) Logs will be rotated when their size exceeds 512MB; 2) Only two logs are kept in the log set, one is the latest current log, and the other is the previous rotated log. --- src/bin/gpfdist/gpfdist.c | 52 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index 96bbd05e12d..2591c6af634 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -360,6 +360,7 @@ struct request_t static int ggetpid(); static void log_gpfdist_status(); static void log_request_header(const request_t *r); +static void log_aging_gpfdist(); static void gprint(const request_t *r, const char* fmt, ...) pg_attribute_printf(2, 3); @@ -2078,6 +2079,55 @@ static void log_request_header(const request_t *r) gprintln(r, "%s:%s", r->in.req->hname[i], r->in.req->hvalue[i]); } +static void log_aging_gpfdist() +{ + struct stat filestats; // Structure to hold file statistics + char newfilename[256]; // Buffer to store the new filename + + if (stat(opt.l, &filestats) == 0 && filestats.st_size >= MAX_GPFDIST_LOGSIZE) + { + if(strlen(opt.l) > 256) + { + fprintf(stderr, "log file name is too long. please change log name under log_aging!\n"); + exit(1); + } + + snprintf(newfilename, sizeof(newfilename), "%s.old", opt.l); + + if(stat(newfilename, &filestats)) + remove(newfilename); + + rename(opt.l, newfilename); + + /* Redirect stderr and stdout to the log file */ + if (opt.l) + { + FILE *f_stderr; + FILE *f_stdout; + + f_stderr = freopen(opt.l, "a", stderr); + if (f_stderr == NULL) + { + fprintf(stderr, "failed to redirect stderr to log: %s under log_aging.\n", strerror(errno)); + exit(1); + } +#ifndef WIN32 + setlinebuf(stderr); +#endif + + f_stdout = freopen(opt.l, "a", stdout); + if (f_stdout == NULL) + { + fprintf(stderr, "failed to redirect stdout to log: %s under log_aging.\n", strerror(errno)); + exit(1); + } +#ifndef WIN32 + setlinebuf(stdout); +#endif + } + } +} + /* * do_read_request * @@ -4663,6 +4713,8 @@ static void do_close(int fd, short event, void *arg) apr_pool_destroy(r->pool); fflush(stdout); + + log_aging_gpfdist(); } /* From edb93c25673533386de043c9142f0bb26de5ef90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Tue, 19 Aug 2025 16:38:07 +0800 Subject: [PATCH 11/24] Add SFTP support to gpfdist for data ingestion Implement the log rotation feature for gpfdist. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit addresses the issue of gpfdist logs continuously growing and occupying a large amount of disk space in a persistent working scenario. To avoid uncontrolled growth, the log rotation feature is implemented. The characteristics are as follows: Define the macro for log size as MAX_GPFDIST_LOGSIZE=512MB --- src/bin/gpfdist/gpfdist.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index 2591c6af634..d079a81b805 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -74,6 +74,8 @@ #define DEFAULT_COMPRESS_LEVEL 1 #define MAX_FRAME_SIZE 65536 +#define MAX_GPFDIST_LOGSIZE (512 * 1024 * 1024) // 512MB + /* A data block */ typedef struct blockhdr_t blockhdr_t; struct blockhdr_t From bcddd0bbbd24be2abbb95851663c8c8762578452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 12 Jul 2025 11:05:31 +0800 Subject: [PATCH 12/24] Extending gpfdist in Cloudberry Database to Support SFTP Protocol for Data Ingestion gpfdist is a file distribution program in Cloudberry that can parallel load external data into the database. However, it has the drawback that data files must reside on the same machine as the tool. Therefore,extending it to support the SFTP protocol can address the above drawback and enable loading files from a remote server. Add the libssh2 library and specify the link. Implement remote data file reading using the libssh2 library with gpfdist. Extending gpfdist in Cloudberry Database to Support SFTP Protocol for Data Ingestion -- add LIBSSH2 macro ADD LIBSSH2 macro Label the SFTP-related code to indicate its characteristics. --- src/backend/Makefile | 1 + src/backend/utils/misc/fstream/Makefile | 4 +- src/backend/utils/misc/fstream/fstream.c | 449 +++++++++++++++++++++-- src/backend/utils/misc/fstream/gfile.c | 269 +++++++++++++- src/bin/gpfdist/Makefile | 2 +- src/bin/gpfdist/gpfdist.c | 43 ++- src/common/Makefile | 1 + src/include/cdb/cdbsreh.h | 1 + src/include/fstream/fstream.h | 9 + src/include/fstream/gfile.h | 31 ++ 10 files changed, 763 insertions(+), 47 deletions(-) diff --git a/src/backend/Makefile b/src/backend/Makefile index 44dbe7f0e15..a10fc61a79a 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -67,6 +67,7 @@ LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) # Cloudberry uses threads in the backend LIBS := $(LIBS) -lpthread +LIBS := $(LIBS) -lssh2 ifeq ($(with_systemd),yes) LIBS += -lsystemd diff --git a/src/backend/utils/misc/fstream/Makefile b/src/backend/utils/misc/fstream/Makefile index c8cea254e8c..835f31c3f0f 100644 --- a/src/backend/utils/misc/fstream/Makefile +++ b/src/backend/utils/misc/fstream/Makefile @@ -9,7 +9,9 @@ subdir = src/backend/utils/misc/fstream top_builddir = ../../../../.. include $(top_builddir)/src/Makefile.global -override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) +LDFLAGS += -lssh2 + +override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) $(CPPFLAGS) OBJS = fstream.o gfile.o diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 56a730b98ad..46ea60ad628 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -30,6 +30,8 @@ #include #endif +#include + char* format_error(char* c1, char* c2); @@ -147,6 +149,36 @@ static void glob_and_copyfree(glob_and_copy_t *pglob) } } +static void glob_and_copyfree_sftp(glob_and_copy_t *pglob) +{ + if (pglob->gl_pathc) + { + int i; + + for (i = 0; i < pglob->gl_pathc; i++) + { + gfile_free(pglob->gl_pathv[i]); + gfile_free(pglob->gl_username[i]); + gfile_free(pglob->gl_passwd[i]); + gfile_free(pglob->gl_hostaddr[i]); + gfile_free(pglob->gl_port[i]); + } + + gfile_free(pglob->gl_pathv); + gfile_free(pglob->gl_username); + gfile_free(pglob->gl_passwd); + gfile_free(pglob->gl_hostaddr); + gfile_free(pglob->gl_port); + + pglob->gl_pathc = 0; + pglob->gl_pathv = 0; + pglob->gl_username = 0; + pglob->gl_passwd = 0; + pglob->gl_hostaddr = 0; + pglob->gl_port = 0; + } +} + const char* fstream_get_error(fstream_t*fs) { @@ -315,7 +347,12 @@ int fstream_close_with_error(fstream_t* fs, char* error) if(fs->buffer) gfile_free(fs->buffer); - glob_and_copyfree(&fs->glob); + if (fs->fd.is_sftp) + { + glob_and_copyfree_sftp(&fs->glob); + } + else + glob_and_copyfree(&fs->glob); gfile_close(&fs->fd); #ifdef GPFXDIST /* @@ -453,6 +490,310 @@ static int glob_path(fstream_t *fs, const char *path) return 0; } +int get_sftp_counts(const char *sftp_request) +{ + const char *start; + const char *end; + int count1 = 0; + int count2 = 0; + + start = (char *)sftp_request; + end = (char *)sftp_request; + + while (*start || *end) + { + if (*start == '<') + count1++; + start++; + if (*end == '>') + count2++; + end++; + } + + if (count1 != count2) + return -1; + + return count1; +} + +int ParseFilePathUri(char *uri_str, sftp_info_t *info) +{ + + int protocol_len = 0; + int len = 0; + char *start, *end; + + if (strncmp(uri_str, "sftp://", 7) == 0) + { + protocol_len = 7; + } + else + { + return -2; + } + + start = (char *)uri_str + protocol_len; + end = strchr(start, ':'); + if (end == NULL) + { + + return -2; + } + else + { + len = end - start; + + char *username = NULL; + username = (char *)gfile_malloc(len + 1); + if (username == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(username, start, len); + username[len] = '\0'; + strcpy(info->username, username); + gfile_free(username); + } + start = end + 1; + end = strchr(start, '@'); + + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *passwd = NULL; + passwd = (char *)gfile_malloc(len + 1); + if (passwd == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(passwd, start, len); + passwd[len] = '\0'; + strcpy(info->password, passwd); + gfile_free(passwd); + } + + start = end + 1; + if (strncmp(start, "[", 1) == 0) + { + end = strchr(start, ']'); + if (end == NULL) + return -2; + + len = end - start; + char *hostaddr6 = NULL; + hostaddr6 = (char *)gfile_malloc(len); + if (hostaddr6 == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(hostaddr6, start + 1, len - 1); + hostaddr6[len - 1] = '\0'; + strcpy(info->hostaddr, hostaddr6); + gfile_free(hostaddr6); + + end++; + } + + else + { + end = strchr(start, ':'); + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *hostaddr4 = NULL; + hostaddr4 = (char *)gfile_malloc(len + 1); + if (hostaddr4 == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(hostaddr4, start, len); + hostaddr4[len] = '\0'; + strcpy(info->hostaddr, hostaddr4); + gfile_free(hostaddr4); + } + } + + start = end + 1; + end = strchr(start, '/'); + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *port = NULL; + port = (char *)gfile_malloc(len + 1); + if (port == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(port, start, len); + port[len] = '\0'; + strcpy(info->port, port); + gfile_free(port); + } + + start = end; + strcpy(info->fpath, start); + + return 0; +} + +static int order_by_hostaddr(const void *e1, const void *e2) +{ + struct in_addr ip1; + struct in_addr ip2; + int result = 0; + + inet_pton(AF_INET, ((struct sftp_info_t *)e1)->hostaddr, &ip1); + inet_pton(AF_INET, ((struct sftp_info_t *)e2)->hostaddr, &ip2); + + result = memcmp(&ip1, &ip2, sizeof(struct in_addr)); + + if (result > 0) + return 1; + else if (result < 0) + return -1; + else + return 0; +} + +static int fetch_sftp_paths(fstream_t *fs, const char *path) +{ + int counts = get_sftp_counts(path); + if(counts == -1) + { + gfile_printf_then_putc_newline("sftp path is Non-conforming, please check "); + return 1; + } + + if(counts == 0) + { + gfile_printf_then_putc_newline("sftp path is empty"); + return 1; + } + char **res = (char **)malloc(sizeof(char *) * counts); + + char **unames = (char **)gfile_malloc(sizeof(char *) * counts); + char **passwds = (char **)gfile_malloc(sizeof(char *) * counts); + char **hostaddrs = (char **)gfile_malloc(sizeof(char *) * counts); + char **ports = (char **)gfile_malloc(sizeof(char *) * counts); + char **fpaths = (char **)gfile_malloc(sizeof(char *) * counts); + + if (!unames || !passwds || !hostaddrs || !ports || !fpaths) + { + gfile_printf_then_putc_newline("out of memory"); + return 1; + } + + sftp_info_t sftp_info_lists[64]; + + fs->glob.gl_pathc = 0; + fs->glob.gl_username = unames; + fs->glob.gl_passwd = passwds; + fs->glob.gl_hostaddr = hostaddrs; + fs->glob.gl_port = ports; + fs->glob.gl_pathv = fpaths; + + { + const char *start; + const char *end; + start = (char *)path; + end = (char *)path; + + for (int i = 0; i < counts; i++) + { + while (*start != '<') + start++; + while (*end != '>') + end++; + int len = end - start - 1; + char *req = (char *)gfile_malloc(len + 1); + if (!req) + { + gfile_printf_then_putc_newline("out of memory"); + return 1; + } + strncpy(req, start + 1, len); + req[len] = '\0'; + res[i] = strdup(req); + gfile_free(req); + start++; + end++; + } + } + + for (int i = 0; i < counts; i++) + { + int parse_result = ParseFilePathUri(res[i], &sftp_info_lists[i]); + if(parse_result == 0) + continue; + else + { + for (int j = 0; j < counts; j++) + { + if(res[i]) + free(res[i]); + } + free(res); + } + } + + qsort(sftp_info_lists, counts, sizeof(sftp_info_t), order_by_hostaddr); + + for (int j = 0; j < counts; j++) + { + char *tmp_uname = sftp_info_lists[j].username; + char *tmp_passwd = sftp_info_lists[j].password; + char *tmp_hostaddr = sftp_info_lists[j].hostaddr; + char *tmp_port = sftp_info_lists[j].port; + char *tmp_fpath = sftp_info_lists[j].fpath; + + *unames = (char *)gfile_malloc(strlen(tmp_uname) + 1); + *passwds = (char *)gfile_malloc(strlen(tmp_passwd) + 1); + *hostaddrs = (char *)gfile_malloc(strlen(tmp_hostaddr) + 1); + *ports = (char *)gfile_malloc(strlen(tmp_port) + 1); + *fpaths = (char *)gfile_malloc(strlen(tmp_fpath) + 1); + + if (!(*unames) || !(*passwds) || !(*hostaddrs) || !(*ports) || !(*fpaths)) + { + gfile_printf_then_putc_newline("out of memory!!!"); + return 1; + } + + strcpy(*unames++, tmp_uname); + strcpy(*passwds++, tmp_passwd); + strcpy(*hostaddrs++, tmp_hostaddr); + strcpy(*ports++, tmp_port); + strcpy(*fpaths++, tmp_fpath); + + fs->glob.gl_pathc++; + } + + for (int i = 0; i < counts; i++) + { + free(res[i]); + } + free(res); + return 0; +} #ifdef GPFXDIST /* @@ -501,6 +842,7 @@ fstream_open(const char *path, const struct fstream_options *options, { int i; fstream_t* fs; + bool_t is_sftp = FALSE; *response_code = 500; *response_string = "Internal Server Error"; @@ -515,27 +857,44 @@ fstream_open(const char *path, const struct fstream_options *options, fs->options = *options; fs->buffer = gfile_malloc(options->bufsize); + if (strncmp(path, "/glob)) + else { - if (expand_directories(fs)) + if (glob_path(fs, path)) { fstream_close(fs); return 0; } + + /* + * If the list of files in our filestrem includes a directory name, expand + * the directory and add all the files inside of it. + */ + if (fpath_all_directories(&fs->glob)) + { + if (expand_directories(fs)) + { + fstream_close(fs); + return 0; + } + } } /* @@ -543,11 +902,14 @@ fstream_open(const char *path, const struct fstream_options *options, */ if (fs->glob.gl_pathc == 0) { - gfile_printf_then_putc_newline("fstream bad path: %s", path); - fstream_close(fs); - *response_code = 404; - *response_string = "No matching file(s) found"; - return 0; + if(!is_sftp) + { + gfile_printf_then_putc_newline("fstream bad path: %s", path); + fstream_close(fs); + *response_code = 404; + *response_string = "No matching file(s) found"; + return 0; + } } if (fs->glob.gl_pathc != 1 && options->forwrite) @@ -649,14 +1011,28 @@ fstream_open(const char *path, const struct fstream_options *options, struct gpfxdist_t* transform = (i == 0) ? options->transform : NULL; gfile_close(&fs->fd); - - if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), - response_code, response_string, transform)) +#ifdef LIBSSH2 + if (is_sftp) { - gfile_printf_then_putc_newline("fstream unable to open file %s", - fs->glob.gl_pathv[i]); - fstream_close(fs); - return 0; + if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[i], fs->glob.gl_username[i], fs->glob.gl_passwd[i], + fs->glob.gl_hostaddr[i], fs->glob.gl_port[i], gfile_open_flags(options->forwrite, options->usesync), response_code, response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", fs->glob.gl_pathv[i]); + fstream_close(fs); + return 0; + } + } +#endif + else + { + if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), + response_code, response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[i]); + fstream_close(fs); + return 0; + } } fs->compressed_size += gfile_get_compressed_size(&fs->fd); @@ -707,15 +1083,30 @@ static int nextFile(fstream_t*fs) if (fs->fidx < fs->glob.gl_pathc) { fs->skip_header_line = fs->options.header; - - if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, - &response_code, &response_string, transform)) +#ifdef LIBSSH2 + if (fs->fd.is_sftp) { - gfile_printf_then_putc_newline("fstream unable to open file %s", - fs->glob.gl_pathv[fs->fidx]); - fs->ferror = "unable to open file"; - return 1; + if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[fs->fidx], fs->glob.gl_username[fs->fidx], fs->glob.gl_passwd[fs->fidx], + fs->glob.gl_hostaddr[fs->fidx], fs->glob.gl_port[fs->fidx], GFILE_OPEN_FOR_READ, &response_code, &response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[fs->fidx]); + fs->ferror = "unable to open file"; + return 1; + } } +#endif + else + { + if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, + &response_code, &response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[fs->fidx]); + fs->ferror = "unable to open file"; + return 1; + } + } } return 0; diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 070ca1c649c..8ce1213d412 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -40,6 +40,14 @@ #include /* for flock */ #include +#ifdef LIBSSH2 +#include +#include +#endif + +#include +#include + #ifdef WIN32 #include #define snprintf _snprintf @@ -146,6 +154,47 @@ writewinpipe(gfile_t* fd, void* ptr, size_t size) return i; } +#ifdef LIBSSH2 +static ssize_t +sftp_read(gfile_t *fd, void *ptr, size_t size) +{ + ssize_t i = 0; + do + i = libssh2_sftp_read(fd->sftp_handle, ptr, size); + while (i < 0 && errno == EINTR); + + if (i < 0) + gfile_printf_then_putc_newline("i is %ld", i); + + if (i > 0) + fd->compressed_position += i; + return i; +} + +static int +sftp_close(gfile_t *fd) +{ + if (fd->sftp_handle) + libssh2_sftp_close(fd->sftp_handle); + if (fd->sftp_session) + libssh2_sftp_shutdown(fd->sftp_session); + if (fd->session) + { + libssh2_session_disconnect(fd->session, "Normal Shutdown"); + libssh2_session_free(fd->session); + } + +#ifdef WIN32 + closesocket(fd->sock); +#else + close(fd->sock); +#endif + fd->sock = -1; + libssh2_exit(); + return 0; +} +#endif + #ifdef HAVE_LIBBZ2 static void * bz_alloc(void *a, int b, int c) @@ -1334,23 +1383,32 @@ gfile_close(gfile_t*fd) { fd->close(fd); } - - if (fd->is_win_pipe) +#ifdef LIBSSH2 + if (fd->is_sftp) { - fd->close(fd); + sftp_close(fd); } +#endif else { - if(fd->held_pipe_lock) + if (fd->is_win_pipe) { + fd->close(fd); + } + else + { + if(fd->held_pipe_lock) + { #ifndef WIN32 - flock (fd->fd.filefd, LOCK_UN); + flock (fd->fd.filefd, LOCK_UN); #endif + } + ret = close_filefd(fd->fd.filefd); + if (ret == -1) + ret = 1; } - ret = close_filefd(fd->fd.filefd); - if (ret == -1) - ret = 1; } + } fd->read = 0; fd->close = 0; @@ -1358,6 +1416,96 @@ gfile_close(gfile_t*fd) return ret; } +#ifdef LIBSSH2 +int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port, int flags, int *response_code, const char **response_string, struct gpfxdist_t *transform) +{ + const char *s = strrchr(fpath, '.'); + + //struct stat sta; + LIBSSH2_SFTP_ATTRIBUTES sta; + memset(&sta, 0, sizeof(sta)); + memset(fd, 0, sizeof *fd); + fd->is_sftp = TRUE; + + int ans = sftp_open(fd, fpath, sftp_uname, sftp_passwd, sftp_hostaddr, sftp_port); + if (ans == 0) + { + gfile_printf_then_putc_newline("looks like a ftp handle"); + gfile_printf_then_putc_newline("path is %s", fpath); + } + else + { + gfile_printf_then_putc_newline("failed open a ftp handle, please check sftp-information: ip, port, passwd and filename"); + if (ans == -2) + { + sftp_free(fd); + } + return 1; + } + + if (flags == GFILE_OPEN_FOR_READ) + { + if (0 != libssh2_sftp_stat(fd->sftp_session, fpath, &sta)) + { + gfile_printf_then_putc_newline("libssh2 libssh2_sftp_stat failed"); + return 1; + } + } + + fd->compressed_size = sta.filesize; + + fd->read = sftp_read; + fd->close = sftp_close; + + /* + * delegate remaining setup work to an appropriate open routine + * or return an error if we can't handle the type + */ + if (s && strcasecmp(s, ".gz") == 0) + { +#ifndef HAVE_LIBZ + gfile_printf_then_putc_newline(".gz not supported"); +#else + /* + * flag used by function gfile close + */ + fd->compression = GZ_COMPRESSION; + + if (flags != GFILE_OPEN_FOR_READ) + { + fd->is_write = TRUE; + } + + return gz_file_open(fd); +#endif + } + else if (s && strcasecmp(s, ".bz2") == 0) + { +#ifndef HAVE_LIBBZ2 + gfile_printf_then_putc_newline(".bz2 not supported"); +#else + fd->compression = BZ_COMPRESSION; + if (flags != GFILE_OPEN_FOR_READ) + gfile_printf_then_putc_newline(".bz2 not yet supported for writable tables"); + + return bz_file_open(fd); +#endif + } + else if (s && strcasecmp(s, ".z") == 0) + gfile_printf_then_putc_newline("gfile compression .z file is not supported"); + else if (s && strcasecmp(s, ".zip") == 0) + gfile_printf_then_putc_newline("gfile compression zip is not supported"); + else + return 0; + + *response_code = 415; + *response_string = "Unsupported File Type"; + + return 1; +} +#endif + ssize_t gfile_read(gfile_t *fd, void *ptr, size_t len) { @@ -1407,3 +1555,108 @@ off_t gfile_get_compressed_position(gfile_t *fd) { return fd->compressed_position; } + +#ifdef LIBSSH2 +int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port) +{ + int rc; + int auth_pw = 0; + char *userauthlist; + uint16_t port; + unsigned long hostaddr; + struct sockaddr_in sin; + + hostaddr = inet_addr(sftp_hostaddr); + + sscanf(sftp_port, "%hu", &port); + + rc = libssh2_init(0); + + if (rc != 0) + { + gfile_printf_then_putc_newline("libssh2 initialization failed (%d)\n", rc); + return -1; + } + + fd->sock = -1; + + fd->sock = socket(AF_INET, SOCK_STREAM, 0); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = hostaddr; + + if (connect(fd->sock, (struct sockaddr *)(&sin), + sizeof(struct sockaddr_in)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } + + fd->session = libssh2_session_init(); + if (!fd->session) + return -1; + + /* Since we have set non-blocking, tell libssh2 we are blocking */ + libssh2_session_set_blocking(fd->session, 1); + + /* ... start it up. This will trade welcome banners, exchange keys, + * and setup crypto, compression, and MAC layers + */ + rc = libssh2_session_handshake(fd->session, fd->sock); + if (rc) + { + gfile_printf_then_putc_newline("Failure establishing SSH session: %d\n", rc); + return -1; + } + + /* check what authentication methods are available */ + userauthlist = libssh2_userauth_list(fd->session, sftp_uname, strlen(sftp_uname)); + if (strstr(userauthlist, "password") != NULL) + { + auth_pw |= 1; + } + + if (auth_pw & 1) + { + if (libssh2_userauth_password(fd->session, sftp_uname, sftp_passwd)) + { + gfile_printf_then_putc_newline("Authentication by password failed.\n"); + return -2; + } + } + + gfile_printf_then_putc_newline("libssh2_sftp_init()!\n"); + fd->sftp_session = libssh2_sftp_init(fd->session); + + if (!(fd->sftp_session)) + { + gfile_printf_then_putc_newline("Unable to init SFTP session\n"); + return -2; + } + + fd->sftp_handle = + libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + if (!(fd->sftp_handle)) + { + gfile_printf_then_putc_newline("Unable to open file with SFTP: %ld\n", + libssh2_sftp_last_error(fd->sftp_session)); + return -2; + } + + return 0; +} +void sftp_free(gfile_t *fd) +{ + libssh2_session_disconnect(fd->session, "Normal Shutdown"); + libssh2_session_free(fd->session); + +#ifdef WIN32 + closesocket(fd->sock); +#else + close(fd->sock); +#endif + fd->sock = -1; + libssh2_exit(); +} +#endif \ No newline at end of file diff --git a/src/bin/gpfdist/Makefile b/src/bin/gpfdist/Makefile index 3739e3835c5..cb84109b309 100644 --- a/src/bin/gpfdist/Makefile +++ b/src/bin/gpfdist/Makefile @@ -25,7 +25,7 @@ ifeq ($(PORTNAME),win32) OBJS += $(top_builddir)/src/port/glob.o endif -LDLIBS += $(LIBS) $(GPFDIST_LIBS) $(apr_link_ld_libs) +LDLIBS += $(LIBS) $(GPFDIST_LIBS) $(apr_link_ld_libs) -lssh2 all: gpfdist$(X) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index ad7fb868313..431ea9362ee 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -1274,6 +1274,10 @@ static void request_end(request_t* r, int error, const char* errmsg, int sendhea static int local_send(request_t *r, const char* buf, int buflen) { int n = gpfdist_send(r, buf, buflen); + + int is_sftp_type = 0; + if(strncmp(r->path, "/session; retblock->bot = retblock->top = 0; @@ -1663,20 +1667,36 @@ static void sessions_cleanup(void) static int session_attach(request_t* r) { char key[1024]; + char tmp_key[1024]; session_t* session = NULL; - + int is_sftp_mode = 0; /* * create the session key (tid:path) */ - if (sizeof(key) - 1 == apr_snprintf(key, sizeof(key), "%s:%s", - r->tid, r->path)) + if (strncmp(r->path, "/tid) + strlen(r->path) >= 1024) + { + gwarning(NULL, "sftp_path is too long"); + request_end(r, 1, 0, 0); + return -1; + } } + else + { + if (sizeof(tmp_key) - 1 == apr_snprintf(tmp_key, sizeof(tmp_key), "%s:%s", + r->tid, r->path)) + { + http_error(r, FDIST_BAD_REQUEST, "path too long"); + request_end(r, 1, 0, 0); + return -1; + } + } + apr_snprintf(key, sizeof(key), "%s:%s", r->tid, r->path); /* check if such session already exists in hashtable */ session = apr_hash_get(gcb.session.tab, key, APR_HASH_KEY_STRING); @@ -1770,7 +1790,8 @@ static int session_attach(request_t* r) fstream_options.transform->errfile = r->trans.errfile; fstream_options.transform->stderr_server = r->trans.stderr_server; } - gprintlnif(r, "r->path %s", r->path); + if (is_sftp_mode == 0) + gprintlnif(r, "r->path %s", r->path); #endif /* try opening the fstream */ @@ -2070,6 +2091,7 @@ static void do_read_request(int fd, short event, void* arg) char* p = NULL; char* pp = NULL; char* path = NULL; + bool is_sftp_type = false; /* If we timeout, close the request. */ if (event & EV_TIMEOUT) @@ -2209,6 +2231,11 @@ static void do_read_request(int fd, short event, void* arg) /* we forced in a filename with the hidden -f option. use it */ r->path = opt.f; } + else if (!strncmp(path, "/path = apr_psprintf(r->pool, "%s", path); + } else { if(request_set_path(r, opt.d, p, pp, path) != 0) diff --git a/src/common/Makefile b/src/common/Makefile index 4549e6a24fb..469190571a8 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -42,6 +42,7 @@ override CPPFLAGS += -DVAL_LIBS="\"$(LIBS)\"" override CPPFLAGS := -DFRONTEND -I. -I$(top_srcdir)/src/common $(CPPFLAGS) LIBS += $(PTHREAD_LIBS) +LIBS += -lssh2 # If you add objects here, see also src/tools/msvc/Mkvcbuild.pm diff --git a/src/include/cdb/cdbsreh.h b/src/include/cdb/cdbsreh.h index fdb29716d3f..97ece750a1e 100644 --- a/src/include/cdb/cdbsreh.h +++ b/src/include/cdb/cdbsreh.h @@ -100,5 +100,6 @@ extern Datum gp_truncate_error_log(PG_FUNCTION_ARGS); extern Datum gp_read_persistent_error_log(PG_FUNCTION_ARGS); extern Datum gp_truncate_persistent_error_log(PG_FUNCTION_ARGS); +extern int get_sftpfile_numbers(const char *path); #endif /* CDBSREH_H */ diff --git a/src/include/fstream/fstream.h b/src/include/fstream/fstream.h index 2c848490b88..e490868e4bc 100644 --- a/src/include/fstream/fstream.h +++ b/src/include/fstream/fstream.h @@ -23,6 +23,12 @@ typedef struct { int gl_pathc; char** gl_pathv; + char** gl_username; + char** gl_passwd; + char** gl_keyfile1; + char** gl_keyfile2; + char** gl_hostaddr; + char** gl_port; } glob_and_copy_t; struct fstream_options{ @@ -85,4 +91,7 @@ int fstream_close_with_error(fstream_t* fs, char* msg); void fstream_close(fstream_t* fs); bool_t fstream_is_win_pipe(fstream_t *fs); +int ParseFilePathUri(char *uri_str, sftp_info_t *info); +int get_sftp_counts(const char *sftp_request); + #endif diff --git a/src/include/fstream/gfile.h b/src/include/fstream/gfile.h index 565a7d910c9..16954c9dece 100644 --- a/src/include/fstream/gfile.h +++ b/src/include/fstream/gfile.h @@ -12,6 +12,11 @@ #include #endif +#ifdef LIBSSH2 +#include +#include +#endif + #ifdef WIN32 #include #endif @@ -57,6 +62,7 @@ typedef struct gfile_t off_t compressed_size,compressed_position; bool_t is_win_pipe; bool_t held_pipe_lock; /* Whether held flock on pipe file, used to restrict only one reader of pipe */ + bool_t is_sftp; union { @@ -66,6 +72,11 @@ typedef struct gfile_t #endif } fd; + LIBSSH2_SESSION *session; + LIBSSH2_SFTP *sftp_session; + LIBSSH2_SFTP_HANDLE *sftp_handle; + int sock; + union { int txt; @@ -85,6 +96,19 @@ typedef struct gfile_t struct gpfxdist_t* transform; } gfile_t; +/* Struct of sftp info */ +typedef struct sftp_info_t sftp_info_t; +struct sftp_info_t +{ + char username[32]; + char password[64]; + char keyfile1[64]; + char keyfile2[64]; + char hostaddr[64]; + char port[16]; + char fpath[256]; +}; + /* * MPP-13817 (support opening files without O_SYNC) */ @@ -103,4 +127,11 @@ void gfile_printf_then_putc_newline(const char*format,...) pg_attribute_printf(1 void*gfile_malloc(size_t size); void gfile_free(void*a); +#ifdef LIBSSH2 +int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port, int flags, int *response_code, const char **response_string, struct gpfxdist_t *transform); +extern int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port); +extern void sftp_free(gfile_t *fd); +#endif #endif From 2c4cc5907c02f56818de170995b9ee724a106e13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Fri, 18 Jul 2025 15:47:24 +0800 Subject: [PATCH 13/24] Feature: Add SFTP support to gpfdist for data ingestion gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: when configure --enable-gpfdist, can dynamic check libssh2, if exist then add LIBSSH2 macro. --- configure | 91 ++++++++++++++++++++++++ configure.ac | 14 ++++ src/backend/utils/misc/fstream/fstream.c | 5 +- 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/configure b/configure index 44d3fae95b6..3aa7d333f4e 100755 --- a/configure +++ b/configure @@ -817,6 +817,8 @@ enable_faultinjector enable_debug_extensions enable_pxf enable_gpfdist +LIBSSH2_LIBS +LIBSSH2_CFLAGS enable_strong_random enable_rpath default_port @@ -976,6 +978,8 @@ XML2_CFLAGS XML2_LIBS LZ4_CFLAGS LZ4_LIBS +LIBSSH2_LIBS +LIBSSH2_CFLAGS LDFLAGS_EX LDFLAGS_SL PERL @@ -1724,6 +1728,10 @@ Some influential environment variables: XML2_LIBS linker flags for XML2, overriding pkg-config LZ4_CFLAGS C compiler flags for LZ4, overriding pkg-config LZ4_LIBS linker flags for LZ4, overriding pkg-config + LIBSSH2_CFLAGS + C compiler flags for LIBSSH2, overriding pkg-config + LIBSSH2_LIBS + linker flags for LIBSSH2, overriding pkg-config LDFLAGS_EX extra linker flags for linking executables only LDFLAGS_SL extra linker flags for linking shared libraries only PERL Perl program @@ -15218,6 +15226,89 @@ fi LIBS="$_LIBS" fi +# +# LIBSSH2 +# +# +if test "$enable_gpfdist" = yes; then + # Check libssh2 >= 1.0.0 + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libssh2 >= 1.0.0" >&5 +$as_echo_n "checking for libssh2 >= 1.0.0... " >&6; } + +if test -n "$LIBSSH2_CFLAGS"; then + pkg_cv_LIBSSH2_CFLAGS="$LIBSSH2_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libssh2 >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libssh2 >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBSSH2_CFLAGS=`$PKG_CONFIG --cflags "libssh2 >= 1.1.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$LIBSSH2_LIBS"; then + pkg_cv_LIBSSH2_LIBS="$LIBSSH2_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libssh2 >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libssh2 >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBSSH2_LIBS=`$PKG_CONFIG --libs "libssh2 >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + LIBSSH2_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libssh2 >= 1.0.0" 2>&1` + else + LIBSSH2_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libssh2 >= 1.0.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$LIBSSH2_PKG_ERRORS" >&5 + + as_fn_error $? "libssh2 >= 1.0.0 is required for gpfdist support" "$LINENO" 5 + +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + as_fn_error $? "libssh2 >= 1.0.0 is required for gpfdist support" "$LINENO" 5 + +else + LIBSSH2_CFLAGS=$pkg_cv_LIBSSH2_CFLAGS + LIBSSH2_LIBS=$pkg_cv_LIBSSH2_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +$as_echo "#define LIBSSH2 1" >>confdefs.h + +fi + + # # SSL Library # diff --git a/configure.ac b/configure.ac index 0d0529fc35f..1e6852b5bd5 100644 --- a/configure.ac +++ b/configure.ac @@ -225,6 +225,15 @@ PGAC_ARG_BOOL(enable, gpfdist, yes, [do not use gpfdist]) AC_SUBST(enable_gpfdist) +if test "$enable_gpfdist" = yes; then + # Check libssh2 >= 1.0.0 + PKG_CHECK_MODULES([LIBSSH2], [libssh2 >= 1.0.0], + [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], + [AC_MSG_ERROR([libssh2 >= 1.0.0 is required for gpfdist support])] + ) + AC_MSG_RESULT([checking whether to build with gpfdist support ... yes]) +fi + # # pxf # @@ -1659,6 +1668,10 @@ if test "$enable_gpfdist" = yes ; then EVENT_LIBS="$EVENT_LIBS" AC_SUBST(EVENT_LIBS) + AC_SEARCH_LIBS(libssh2_init, [libssh2], [have_libssh2=yes; LIBSSH2_LIBS=" -lssh2"], [AC_MSG_ERROR([libssh2 is required for gpfdist])]) + AC_SUBST(LIBSSH2_LIBS) + AC_SUBST(have_libssh2) + AC_SEARCH_LIBS(yaml_parser_initialize, [yaml], [have_yaml=yes; YAML_LIBS=" -lyaml"], [AC_MSG_WARN([libyaml is not found. disabling transformations for gpfdist.])]) AC_SUBST(YAML_LIBS) AC_SUBST(have_yaml) @@ -1957,6 +1970,7 @@ if test "$enable_gpfdist" = yes; then AC_CHECK_HEADERS(yaml.h, [], [AC_MSG_WARN([header file is not found. disabling transformations for gpfdist.])]) AC_CHECK_HEADERS(event.h, [], [AC_MSG_ERROR([header file is required for gpfdist])]) + AC_CHECK_HEADERS(libssh2, [], [AC_MSG_ERROR([header file is required for gpfdist])]) ac_save_CPPFLAGS=$CPPFLAGS CPPFLAGS="$apr_includes $CPPFLAGS" diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 46ea60ad628..6a0f002a91a 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -1095,7 +1095,7 @@ static int nextFile(fstream_t*fs) return 1; } } -#endif +#else else { if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, @@ -1106,7 +1106,8 @@ static int nextFile(fstream_t*fs) fs->ferror = "unable to open file"; return 1; } - } + } +#endif } return 0; From b0bda87e04116f31c8033bd0dfb18a53b326c9e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 19 Jul 2025 16:13:31 +0800 Subject: [PATCH 14/24] Feature: Add SFTP support to gpfdist for data ingestion gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: Adding the `LIBSSH2` preprocessor macro to conditionally compile the new SFTP-related code.when configure --enable-gpfdist, we can dynamic check libssh2, if exist then add LIBSSH2 macro. else will print warning information. --- configure.ac | 6 +++--- src/backend/utils/misc/fstream/fstream.c | 10 +++++----- src/backend/utils/misc/fstream/gfile.c | 5 +++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/configure.ac b/configure.ac index 1e6852b5bd5..6ea9e57fd6c 100644 --- a/configure.ac +++ b/configure.ac @@ -228,9 +228,9 @@ AC_SUBST(enable_gpfdist) if test "$enable_gpfdist" = yes; then # Check libssh2 >= 1.0.0 PKG_CHECK_MODULES([LIBSSH2], [libssh2 >= 1.0.0], - [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], - [AC_MSG_ERROR([libssh2 >= 1.0.0 is required for gpfdist support])] - ) + [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], + [AC_MSG_WARN([libssh2 >= 1.0.0 not found, gpfdist will build without libssh2 support])] +) AC_MSG_RESULT([checking whether to build with gpfdist support ... yes]) fi diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 6a0f002a91a..cbb47dedef1 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -1011,9 +1011,10 @@ fstream_open(const char *path, const struct fstream_options *options, struct gpfxdist_t* transform = (i == 0) ? options->transform : NULL; gfile_close(&fs->fd); -#ifdef LIBSSH2 + if (is_sftp) { +#ifdef LIBSSH2 if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[i], fs->glob.gl_username[i], fs->glob.gl_passwd[i], fs->glob.gl_hostaddr[i], fs->glob.gl_port[i], gfile_open_flags(options->forwrite, options->usesync), response_code, response_string, transform)) { @@ -1021,8 +1022,8 @@ fstream_open(const char *path, const struct fstream_options *options, fstream_close(fs); return 0; } - } #endif + } else { if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), @@ -1083,9 +1084,9 @@ static int nextFile(fstream_t*fs) if (fs->fidx < fs->glob.gl_pathc) { fs->skip_header_line = fs->options.header; -#ifdef LIBSSH2 if (fs->fd.is_sftp) { +#ifdef LIBSSH2 if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[fs->fidx], fs->glob.gl_username[fs->fidx], fs->glob.gl_passwd[fs->fidx], fs->glob.gl_hostaddr[fs->fidx], fs->glob.gl_port[fs->fidx], GFILE_OPEN_FOR_READ, &response_code, &response_string, transform)) { @@ -1094,8 +1095,8 @@ static int nextFile(fstream_t*fs) fs->ferror = "unable to open file"; return 1; } +#endif } -#else else { if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, @@ -1107,7 +1108,6 @@ static int nextFile(fstream_t*fs) return 1; } } -#endif } return 0; diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 8ce1213d412..0925a7c07e2 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -1383,12 +1383,13 @@ gfile_close(gfile_t*fd) { fd->close(fd); } -#ifdef LIBSSH2 if (fd->is_sftp) { +#ifdef LIBSSH2 sftp_close(fd); - } #endif + } + else { if (fd->is_win_pipe) From 976cc1516ac931f072c16fad2f5c8a55283e6aec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 21 Jul 2025 19:34:59 +0800 Subject: [PATCH 15/24] Feature: Add SFTP support to gpfdist for data gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: Fix the compilation errors related to the libssh2 library option in the configure file. --- configure | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/configure b/configure index 3aa7d333f4e..dffd3ca968b 100755 --- a/configure +++ b/configure @@ -15304,11 +15304,10 @@ else { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 $as_echo "yes" >&6; } +fi $as_echo "#define LIBSSH2 1" >>confdefs.h - fi - # # SSL Library # From b28329706233470dfa162512fd77be37a42ff468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 26 Jul 2025 11:07:46 +0800 Subject: [PATCH 16/24] Add SFTP support to gpfdist for data loading gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change include: Fix the compilation errors related to the libssh2 library option in the configure file. --- src/include/fstream/gfile.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/include/fstream/gfile.h b/src/include/fstream/gfile.h index 16954c9dece..0bc01f644e9 100644 --- a/src/include/fstream/gfile.h +++ b/src/include/fstream/gfile.h @@ -72,10 +72,12 @@ typedef struct gfile_t #endif } fd; +#ifdef LIBSSH2 LIBSSH2_SESSION *session; LIBSSH2_SFTP *sftp_session; LIBSSH2_SFTP_HANDLE *sftp_handle; int sock; +#endif union { From 6d2f45c21c599d28f66cb717655b8025bcd0e289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 26 Jul 2025 10:17:08 +0800 Subject: [PATCH 17/24] Add SFTP support to gpfdist for data ingestion Feature:Supporting the Loading of bz Format Files gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the loading of .bz2 files by utilizing the read functions provided by the libssh2 library. Add SFTP support to gpfdist for data ingestion Feature:Supporting the Loading of bz(bz2) Format Files gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the loading of .bz2 files by utilizing the read functions provided by the libssh2 library. --- src/backend/utils/misc/fstream/gfile.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 0925a7c07e2..f8336a2b7d4 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -241,8 +241,17 @@ bz_file_read(gfile_t *fd, void *ptr, size_t len) while (z->in_size < sizeof z->in) { - s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - - z->in_size); + if (fd->is_sftp) + { +#ifdef LIBSSH2 + gfile_printf_then_putc_newline("sftp_read : Read bz files from an SFTP server"); + s = sftp_read(fd, z->in + z->in_size, sizeof z->in - z->in_size); +#endif + } + + else + s = read_and_retry(fd, z->in + z->in_size, sizeof z->in + - z->in_size); if (s == 0) break; if (s < 0) From 47fabe8b80a1d4eedff7ea991ea0dda3659b6a25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 10:38:13 +0800 Subject: [PATCH 18/24] Add SFTP support to gpfdist for data ingestion Feature:Supporting the Loading of gz Format Files gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the loading of gz files by utilizing the read functions provided by the libssh2 library. --- src/backend/utils/misc/fstream/gfile.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index f8336a2b7d4..9ca0931a37d 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -364,7 +364,15 @@ gz_file_read(gfile_t* fd, void* ptr, size_t len) */ while (z->in_size < sizeof z->in) { - s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - z->in_size); + if (fd->is_sftp) + { +#ifdef LIBSSH2 + gfile_printf_then_putc_newline("sftp_read : Read gz files from an SFTP server"); + s = sftp_read(fd, z->in + z->in_size, sizeof z->in - z->in_size); +#endif + } + else + s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - z->in_size); if (s == 0) { From 14e20c1556ab94d739f8bca640163657860302aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 11:20:13 +0800 Subject: [PATCH 19/24] Add SFTP support to gpfdist for data ingestion Feature:Support for SFTP server data access with IPv6 addresses. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Support for SFTP server data access based on address type, including both IPv4 and IPv6 addresses. --- src/backend/utils/misc/fstream/gfile.c | 51 +++++++++++++++++++++----- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 9ca0931a37d..8c1fd2b3a25 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -1584,8 +1584,23 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char uint16_t port; unsigned long hostaddr; struct sockaddr_in sin; + struct sockaddr_in6 sin6; + int is_ipv6 = 0; - hostaddr = inet_addr(sftp_hostaddr); + if (strchr(sftp_hostaddr, ':')) + { + is_ipv6 = 1; + } + if (!is_ipv6) + { + hostaddr = inet_addr(sftp_hostaddr); + } + + else + { + bzero(&sin6, sizeof(sin6)); + inet_pton(AF_INET6, sftp_hostaddr, &sin6.sin6_addr); + } sscanf(sftp_port, "%hu", &port); @@ -1599,18 +1614,34 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char fd->sock = -1; - fd->sock = socket(AF_INET, SOCK_STREAM, 0); - sin.sin_family = AF_INET; - sin.sin_port = htons(port); - sin.sin_addr.s_addr = hostaddr; - - if (connect(fd->sock, (struct sockaddr *)(&sin), - sizeof(struct sockaddr_in)) != 0) + if (!is_ipv6) { - gfile_printf_then_putc_newline("failed to connect!\n"); - return -1; + fd->sock = socket(AF_INET, SOCK_STREAM, 0); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = hostaddr; + + if (connect(fd->sock, (struct sockaddr *)(&sin), + sizeof(struct sockaddr_in)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } } + else + { + fd->sock = socket(AF_INET6, SOCK_STREAM, 0); + sin6.sin6_family = AF_INET6; + sin6.sin6_port = htons(port); + if (connect(fd->sock, (struct sockaddr *)(&sin6), + sizeof(struct sockaddr_in6)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } + } + fd->session = libssh2_session_init(); if (!fd->session) return -1; From bed2beedc8d289b9d8f372fa04eff4cd647863fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 19:49:30 +0800 Subject: [PATCH 20/24] Add SFTP support to gpfdist for data ingestion Feature:Support for writing CloudBerry table data to a remote SFTP server to achieve backup functionality. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit extends gpfdist to support the SFTP protocol, enabling users to ingest data directly from remote servers. This enhancement streamlines ETL workflows by allowing `CREATE EXTERNAL TABLE` to specify SFTP locations. Key change information: Implement the `sftp_write` function to write CloudBerry table data to a remote SFTP server, thereby achieving backup functionality. --- src/backend/utils/misc/fstream/gfile.c | 49 +++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 8c1fd2b3a25..f347105da76 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -171,6 +171,19 @@ sftp_read(gfile_t *fd, void *ptr, size_t size) return i; } +static ssize_t +sftp_write(gfile_t *fd, void *ptr, size_t size) +{ + ssize_t i = 0; + do + i = libssh2_sftp_write(fd->sftp_handle, ptr, size); + while (i < 0 && errno == EINTR); + + if (i > 0) + fd->compressed_position += i; + return i; +} + static int sftp_close(gfile_t *fd) { @@ -452,7 +465,19 @@ gz_file_write_one_chunk(gfile_t *fd, int do_flush) } have = COMPRESSION_BUFFER_SIZE - z->s.avail_out; - if ( write_and_retry(fd, z->out, have) != have ) + if (fd->is_sftp) + { +#ifdef LIBSSH2 + if (sftp_write(fd, z->out, have) != have) + { + gfile_printf_then_putc_newline("failed to sftp write, the stream ends"); + (void)deflateEnd(&(z->s)); + ret = -1; + break; + } +#endif + } + else if ( write_and_retry(fd, z->out, have) != have ) { /* * presently gfile_close calls gz_file_close only for the on_write case so we don't need @@ -1446,6 +1471,11 @@ int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, cons memset(fd, 0, sizeof *fd); fd->is_sftp = TRUE; + if (flags != GFILE_OPEN_FOR_READ) + { + fd->is_write = TRUE; + } + int ans = sftp_open(fd, fpath, sftp_uname, sftp_passwd, sftp_hostaddr, sftp_port); if (ans == 0) { @@ -1474,6 +1504,7 @@ int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, cons fd->compressed_size = sta.filesize; fd->read = sftp_read; + fd->write = sftp_write; fd->close = sftp_close; /* @@ -1641,7 +1672,7 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char return -1; } } - + fd->session = libssh2_session_init(); if (!fd->session) return -1; @@ -1684,8 +1715,18 @@ int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char return -2; } - fd->sftp_handle = - libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + if (fd->is_write) + { + fd->sftp_handle = libssh2_sftp_open(fd->sftp_session, fpath, + LIBSSH2_FXF_WRITE | LIBSSH2_FXF_CREAT | LIBSSH2_FXF_TRUNC, + LIBSSH2_SFTP_S_IRUSR | LIBSSH2_SFTP_S_IWUSR | + LIBSSH2_SFTP_S_IRGRP | LIBSSH2_SFTP_S_IROTH); + } + else + { + fd->sftp_handle = libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + } + if (!(fd->sftp_handle)) { gfile_printf_then_putc_newline("Unable to open file with SFTP: %ld\n", From 2aad6790d1a644fc2a7fbdc46e0e1dc049861596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Mon, 18 Aug 2025 19:59:56 +0800 Subject: [PATCH 21/24] Add SFTP support to gpfdist for data ingestion Implement the log rotation feature for gpfdist. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit addresses the issue of gpfdist logs continuously growing and occupying a large amount of disk space in a persistent working scenario. To avoid uncontrolled growth, the log rotation feature is implemented. The characteristics are as follows: 1) Logs will be rotated when their size exceeds 512MB; 2) Only two logs are kept in the log set, one is the latest current log, and the other is the previous rotated log. --- src/bin/gpfdist/gpfdist.c | 52 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index 431ea9362ee..b28e2b4a572 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -361,6 +361,7 @@ struct request_t static int ggetpid(); static void log_gpfdist_status(); static void log_request_header(const request_t *r); +static void log_aging_gpfdist(); static void gprint(const request_t *r, const char* fmt, ...) pg_attribute_printf(2, 3); @@ -2079,6 +2080,55 @@ static void log_request_header(const request_t *r) gprintln(r, "%s:%s", r->in.req->hname[i], r->in.req->hvalue[i]); } +static void log_aging_gpfdist() +{ + struct stat filestats; // Structure to hold file statistics + char newfilename[256]; // Buffer to store the new filename + + if (stat(opt.l, &filestats) == 0 && filestats.st_size >= MAX_GPFDIST_LOGSIZE) + { + if(strlen(opt.l) > 256) + { + fprintf(stderr, "log file name is too long. please change log name under log_aging!\n"); + exit(1); + } + + snprintf(newfilename, sizeof(newfilename), "%s.old", opt.l); + + if(stat(newfilename, &filestats)) + remove(newfilename); + + rename(opt.l, newfilename); + + /* Redirect stderr and stdout to the log file */ + if (opt.l) + { + FILE *f_stderr; + FILE *f_stdout; + + f_stderr = freopen(opt.l, "a", stderr); + if (f_stderr == NULL) + { + fprintf(stderr, "failed to redirect stderr to log: %s under log_aging.\n", strerror(errno)); + exit(1); + } +#ifndef WIN32 + setlinebuf(stderr); +#endif + + f_stdout = freopen(opt.l, "a", stdout); + if (f_stdout == NULL) + { + fprintf(stderr, "failed to redirect stdout to log: %s under log_aging.\n", strerror(errno)); + exit(1); + } +#ifndef WIN32 + setlinebuf(stdout); +#endif + } + } +} + /* * do_read_request * @@ -4704,6 +4754,8 @@ static void do_close(int fd, short event, void *arg) apr_pool_destroy(r->pool); fflush(stdout); + + log_aging_gpfdist(); } /* From ae5ee92ca5d9497829a26cd344a2aa9e86e68147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Tue, 19 Aug 2025 16:38:07 +0800 Subject: [PATCH 22/24] Add SFTP support to gpfdist for data ingestion Implement the log rotation feature for gpfdist. gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit addresses the issue of gpfdist logs continuously growing and occupying a large amount of disk space in a persistent working scenario. To avoid uncontrolled growth, the log rotation feature is implemented. The characteristics are as follows: Define the macro for log size as MAX_GPFDIST_LOGSIZE=512MB --- src/bin/gpfdist/gpfdist.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index b28e2b4a572..2a51017ffed 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -74,6 +74,8 @@ #define DEFAULT_COMPRESS_LEVEL 3 #define MAX_FRAME_SIZE 65536 +#define MAX_GPFDIST_LOGSIZE (512 * 1024 * 1024) // 512MB + /* A data block */ typedef struct blockhdr_t blockhdr_t; struct blockhdr_t From ef0466d4542fa90f05331e23d8c867e75755a4db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Thu, 16 Oct 2025 16:42:40 +0800 Subject: [PATCH 23/24] Add libssh2-devel package for gpfdist SFTP protocol gpfdist, Cloudberry's parallel file distribution program, traditionally required data files to be co-located with the gpfdist process. This limitation made it cumbersome to load data from remote servers, often requiring an extra data transfer step. This commit addresses the issue of gpfdist logs continuously growing and occupying a large amount of disk space in a persistent working scenario. libssh2-devel is introduced as a dependency package for the new feature in the PR apache#1226. --- devops/deploy/docker/build/rocky8/Dockerfile | 1 + devops/deploy/docker/build/rocky9/Dockerfile | 1 + 2 files changed, 2 insertions(+) diff --git a/devops/deploy/docker/build/rocky8/Dockerfile b/devops/deploy/docker/build/rocky8/Dockerfile index daef18a4e4e..7bcb8c645d5 100644 --- a/devops/deploy/docker/build/rocky8/Dockerfile +++ b/devops/deploy/docker/build/rocky8/Dockerfile @@ -132,6 +132,7 @@ RUN dnf makecache && \ zlib-devel && \ dnf install -y -d0 --enablerepo=devel \ libuv-devel \ + libssh2-devel \ libyaml-devel \ perl-IPC-Run \ protobuf-devel && \ diff --git a/devops/deploy/docker/build/rocky9/Dockerfile b/devops/deploy/docker/build/rocky9/Dockerfile index 6246ae79d75..6e19b850cbe 100644 --- a/devops/deploy/docker/build/rocky9/Dockerfile +++ b/devops/deploy/docker/build/rocky9/Dockerfile @@ -135,6 +135,7 @@ RUN dnf makecache && \ zlib-devel && \ dnf install -y --enablerepo=crb \ libuv-devel \ + libssh2-devel \ libyaml-devel \ perl-IPC-Run \ protobuf-devel && \ From 8f4673a70a105ad7a7fe5e26b82b331488a2f58c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=B9=B310304955?= Date: Sat, 18 Oct 2025 15:39:16 +0800 Subject: [PATCH 24/24] Feature: Add SFTP support to gpfdist for data Add the header file references for the stat system call. --- src/bin/gpfdist/gpfdist.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index 2a51017ffed..826ec2569b6 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -71,6 +71,10 @@ #include #endif +#include +#include +#include + #define DEFAULT_COMPRESS_LEVEL 3 #define MAX_FRAME_SIZE 65536