From 9239ca7406117353f0dda7e5f61dff1c730f7b6e Mon Sep 17 00:00:00 2001 From: kento_yoshida Date: Fri, 27 Jun 2025 06:45:31 +0000 Subject: [PATCH 1/8] feat: Add JDBC log S3 upload with IAM role support - Add JdbcLogUploader class for automatic JDBC log upload to S3 - Support both IAM role and explicit AWS credentials authentication - Add configuration options for S3 upload (bucket, prefix, region, credentials) - Enable JDBC tracing when log upload is enabled - Upload logs only when communication errors occur - Implement proper resource management with AutoCloseable - Add cross-platform support using system temp directory - Update README with configuration examples and usage instructions - Add AWS SDK v2 dependencies with conflict exclusions This feature helps with debugging Snowflake connection issues by automatically uploading JDBC driver logs to S3 when errors occur. --- README.md | 49 +++++++++++++ build.gradle | 18 +++++ .../embulk/output/SnowflakeOutputPlugin.java | 69 +++++++++++++++++++ .../org/embulk/output/s3/JdbcLogUploader.java | 67 ++++++++++++++++++ 4 files changed, 203 insertions(+) create mode 100644 src/main/java/org/embulk/output/s3/JdbcLogUploader.java diff --git a/README.md b/README.md index 5d7b3de..12c4f3d 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,12 @@ Snowflake output plugin for Embulk loads records to Snowflake. - **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values) - **batch_size**: size of a single batch insert (integer, default: 16777216) - **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none") +- **upload_jdbc_log_to_s3**: enable automatic upload of JDBC driver logs to S3 when communication errors occur (boolean, default: false) +- **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) +- **s3_prefix**: S3 key prefix for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) +- **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true) +- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses IAM role if not specified) +- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses IAM role if not specified) - **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`) - **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column. - **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp) @@ -62,6 +68,49 @@ Snowflake output plugin for Embulk loads records to Snowflake. * Transactional: Yes. * Resumable: No. +## JDBC Log Upload to S3 + +This plugin supports automatic upload of JDBC driver logs to S3 when communication errors occur. This feature is useful for debugging connection issues with Snowflake. + +### Configuration Example + +```yaml +out: + type: snowflake + host: your-account.snowflakecomputing.com + user: your_user + password: your_password + warehouse: your_warehouse + database: your_database + schema: your_schema + table: your_table + mode: insert + + # JDBC log upload configuration + upload_jdbc_log_to_s3: true + s3_bucket: your-log-bucket + s3_prefix: snowflake-jdbc-logs + s3_region: us-east-1 + + # Optional: Explicit AWS credentials (uses IAM role if not specified) + s3_access_key_id: YOUR_ACCESS_KEY_ID + s3_secret_access_key: YOUR_SECRET_ACCESS_KEY +``` + +### Authentication Methods + +1. **IAM Role (Recommended)**: Leave `s3_access_key_id` and `s3_secret_access_key` unspecified. The plugin will use the default AWS credentials provider chain (IAM role, environment variables, etc.). + +2. **Explicit Credentials**: Specify both `s3_access_key_id` and `s3_secret_access_key` for explicit authentication. + +### Behavior + +- JDBC logs are only uploaded when communication errors occur during Snowflake operations +- The plugin automatically finds the latest `snowflake_jdbc*.log` file in the system temp directory +- Logs are uploaded to `s3://{bucket}/{prefix}/{filename}` +- If S3 upload fails, a warning is logged but the original error is still thrown +- If required S3 configuration is missing, a warning is logged and log upload is skipped + ## Build ``` diff --git a/build.gradle b/build.gradle index e308b73..04a69f9 100644 --- a/build.gradle +++ b/build.gradle @@ -52,6 +52,24 @@ dependencies { compile "org.embulk:embulk-output-jdbc:0.10.2" compile "net.snowflake:snowflake-jdbc:3.13.26" + + implementation platform("software.amazon.awssdk:bom:2.25.20") + + implementation("software.amazon.awssdk:s3") { + exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" + exclude group: "com.fasterxml.jackson.core", module: "jackson-core" + exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" + exclude group: "joda-time", module: "joda-time" + exclude group: "commons-logging", module: "commons-logging" + } + + implementation("software.amazon.awssdk:regions") { + exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" + exclude group: "com.fasterxml.jackson.core", module: "jackson-core" + exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" + exclude group: "joda-time", module: "joda-time" + exclude group: "commons-logging", module: "commons-logging" + } } embulkPlugin { mainClass = "org.embulk.output.SnowflakeOutputPlugin" diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 9c01d97..04be164 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import java.io.File; import java.io.IOException; import java.sql.SQLException; import java.sql.Types; @@ -15,6 +16,7 @@ import org.embulk.config.ConfigSource; import org.embulk.config.TaskSource; import org.embulk.output.jdbc.*; +import org.embulk.output.s3.JdbcLogUploader; import org.embulk.output.snowflake.PrivateKeyReader; import org.embulk.output.snowflake.SnowflakeCopyBatchInsert; import org.embulk.output.snowflake.SnowflakeOutputConnection; @@ -91,6 +93,30 @@ public interface SnowflakePluginTask extends PluginTask { @ConfigDefault("\"none\"") public MatchByColumnName getMatchByColumnName(); + @Config("upload_jdbc_log_to_s3") + @ConfigDefault("false") + public boolean getUploadJdbcLogToS3(); + + @Config("s3_bucket") + @ConfigDefault("null") + public String getS3Bucket(); + + @Config("s3_prefix") + @ConfigDefault("null") + public String getS3Prefix(); + + @Config("s3_region") + @ConfigDefault("null") + public String getS3Region(); + + @Config("s3_access_key_id") + @ConfigDefault("null") + public String getS3AccessKeyId(); + + @Config("s3_secret_access_key") + @ConfigDefault("null") + public String getS3SecretAccessKey(); + public void setCopyIntoTableColumnNames(String[] columnNames); public String[] getCopyIntoTableColumnNames(); @@ -139,6 +165,9 @@ public static MatchByColumnName fromString(String value) { private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115; private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195; + private static final String ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE = + "JDBC driver encountered communication error"; + @Override protected Class getTaskClass() { return SnowflakePluginTask.class; @@ -198,6 +227,10 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet props.setProperty("CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX", "true"); props.setProperty("MULTI_STATEMENT_COUNT", "0"); + if (t.getUploadJdbcLogToS3()) { + props.setProperty("tracing", "ALL"); + } + props.putAll(t.getOptions()); logConnectionProperties(url, props); @@ -222,6 +255,42 @@ public ConfigDiff transaction( runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); } } catch (Exception e) { + if (e instanceof SQLException) { + String message = e.getMessage(); + if (message != null + && message.contains(ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE) + && t.getUploadJdbcLogToS3() == true) { + final String s3Bucket = t.getS3Bucket(); + final String s3Prefix = t.getS3Prefix(); + final String s3Region = t.getS3Region(); + final String s3AccessKeyId = t.getS3AccessKeyId(); + final String s3SecretAccessKey = t.getS3SecretAccessKey(); + if (s3Bucket == null || s3Prefix == null || s3Region == null) { + logger.warn("s3_bucket, s3_prefix, and s3_region must be set when upload_jdbc_log_to_s3 is true"); + } else { + try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(s3Bucket, s3Prefix, s3Region, s3AccessKeyId, s3SecretAccessKey)) { + // snowflake_jdbc*.log で最新のファイルを探してアップロード + String tmpDir = System.getProperty("java.io.tmpdir", "/tmp"); + File logDir = new File(tmpDir); + File[] logFiles = + logDir.listFiles( + (dir, name) -> name.startsWith("snowflake_jdbc") && name.endsWith(".log")); + if (logFiles != null && logFiles.length > 0) { + // 最終更新日時が新しいファイルを選択 + Optional latest = + Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified)); + if (latest.isPresent()) { + jdbcLogUploader.uploadIfExists(latest.get()); + } + } else { + logger.warn("No snowflake_jdbc*.log file found in {} for upload", tmpDir); + } + } catch (Exception uploadException) { + logger.warn("Failed to upload JDBC log to S3: {}", uploadException.getMessage()); + } + } + } + } if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); diff --git a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java new file mode 100644 index 0000000..c085197 --- /dev/null +++ b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java @@ -0,0 +1,67 @@ +package org.embulk.output.s3; + +import java.io.File; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +public class JdbcLogUploader implements AutoCloseable { + private final Logger logger = LoggerFactory.getLogger(JdbcLogUploader.class); + + private final S3Client s3Client; + private final String bucket; + private final String prefix; + private final Region region; + + public JdbcLogUploader(String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) { + this.bucket = bucket; + this.prefix = prefix; + this.region = Region.of(region); + + if (accessKeyId != null && secretAccessKey != null) { + // Use explicit credentials + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey); + this.s3Client = S3Client.builder() + .region(this.region) + .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) + .build(); + } else { + // Use default credentials provider (IAM role, environment variables, etc.) + this.s3Client = S3Client.builder() + .region(this.region) + .credentialsProvider(DefaultCredentialsProvider.create()) + .build(); + } + } + + public void uploadIfExists(File file) { + if (!file.exists()) { + logger.warn("File not found: {}", file.getAbsolutePath()); + return; + } + + String key = prefix + "/" + file.getName(); + try { + PutObjectRequest putObjectRequest = + PutObjectRequest.builder().bucket(bucket).key(key).build(); + + s3Client.putObject(putObjectRequest, RequestBody.fromFile(file)); + logger.info("Uploaded {} to s3://{}/{}", file.getAbsolutePath(), bucket, key); + } catch (Exception e) { + logger.error("Failed to upload {} to S3: {}", file.getAbsolutePath(), e.getMessage(), e); + } + } + + @Override + public void close() { + if (s3Client != null) { + s3Client.close(); + } + } +} From d4257ed39aaf7ad1d12e0ff9fa864af611f0328e Mon Sep 17 00:00:00 2001 From: kento_yoshida Date: Fri, 27 Jun 2025 07:45:33 +0000 Subject: [PATCH 2/8] Fix S3 configuration fields to use Optional - Change s3_bucket, s3_prefix, s3_region, s3_access_key_id, s3_secret_access_key from String to Optional - Remove @ConfigDefault("null") annotations as they are not needed for Optional types - Update usage in transaction method to handle Optional values properly - Update README.md to clarify that S3 credentials are optional and use default AWS credentials provider chain --- README.md | 4 +-- .../embulk/output/SnowflakeOutputPlugin.java | 29 ++++++++----------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 12c4f3d..d87058c 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,8 @@ Snowflake output plugin for Embulk loads records to Snowflake. - **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) - **s3_prefix**: S3 key prefix for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) - **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true) -- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses IAM role if not specified) -- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses IAM role if not specified) +- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses default AWS credentials provider chain if not specified) +- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses default AWS credentials provider chain if not specified) - **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`) - **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column. - **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 04be164..bdc4877 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -98,24 +98,19 @@ public interface SnowflakePluginTask extends PluginTask { public boolean getUploadJdbcLogToS3(); @Config("s3_bucket") - @ConfigDefault("null") - public String getS3Bucket(); + public Optional getS3Bucket(); @Config("s3_prefix") - @ConfigDefault("null") - public String getS3Prefix(); + public Optional getS3Prefix(); @Config("s3_region") - @ConfigDefault("null") - public String getS3Region(); + public Optional getS3Region(); @Config("s3_access_key_id") - @ConfigDefault("null") - public String getS3AccessKeyId(); + public Optional getS3AccessKeyId(); @Config("s3_secret_access_key") - @ConfigDefault("null") - public String getS3SecretAccessKey(); + public Optional getS3SecretAccessKey(); public void setCopyIntoTableColumnNames(String[] columnNames); @@ -260,15 +255,15 @@ public ConfigDiff transaction( if (message != null && message.contains(ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE) && t.getUploadJdbcLogToS3() == true) { - final String s3Bucket = t.getS3Bucket(); - final String s3Prefix = t.getS3Prefix(); - final String s3Region = t.getS3Region(); - final String s3AccessKeyId = t.getS3AccessKeyId(); - final String s3SecretAccessKey = t.getS3SecretAccessKey(); - if (s3Bucket == null || s3Prefix == null || s3Region == null) { + final Optional s3Bucket = t.getS3Bucket(); + final Optional s3Prefix = t.getS3Prefix(); + final Optional s3Region = t.getS3Region(); + final Optional s3AccessKeyId = t.getS3AccessKeyId(); + final Optional s3SecretAccessKey = t.getS3SecretAccessKey(); + if (!s3Bucket.isPresent() || !s3Prefix.isPresent() || !s3Region.isPresent()) { logger.warn("s3_bucket, s3_prefix, and s3_region must be set when upload_jdbc_log_to_s3 is true"); } else { - try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(s3Bucket, s3Prefix, s3Region, s3AccessKeyId, s3SecretAccessKey)) { + try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(s3Bucket.get(), s3Prefix.get(), s3Region.get(), s3AccessKeyId.orElse(null), s3SecretAccessKey.orElse(null))) { // snowflake_jdbc*.log で最新のファイルを探してアップロード String tmpDir = System.getProperty("java.io.tmpdir", "/tmp"); File logDir = new File(tmpDir); From 9ba88f1bd9f3d0dfd7ac558564e79f7ef469e4f4 Mon Sep 17 00:00:00 2001 From: kento_yoshida Date: Fri, 27 Jun 2025 07:53:25 +0000 Subject: [PATCH 3/8] Add @ConfigDefault("null") to Optional S3 fields - Add @ConfigDefault("null") annotations to s3_bucket, s3_prefix, s3_region, s3_access_key_id, s3_secret_access_key - This is required for Embulk's config system to properly handle Optional fields with null defaults - Fixes 'Field is required but not set' error when S3 configuration is not provided --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index bdc4877..b2b92b0 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -98,18 +98,23 @@ public interface SnowflakePluginTask extends PluginTask { public boolean getUploadJdbcLogToS3(); @Config("s3_bucket") + @ConfigDefault("null") public Optional getS3Bucket(); @Config("s3_prefix") + @ConfigDefault("null") public Optional getS3Prefix(); @Config("s3_region") + @ConfigDefault("null") public Optional getS3Region(); @Config("s3_access_key_id") + @ConfigDefault("null") public Optional getS3AccessKeyId(); @Config("s3_secret_access_key") + @ConfigDefault("null") public Optional getS3SecretAccessKey(); public void setCopyIntoTableColumnNames(String[] columnNames); From 394440d998fd015c71af257592bd8738c58b06d4 Mon Sep 17 00:00:00 2001 From: kento_yoshida Date: Mon, 30 Jun 2025 02:34:51 +0000 Subject: [PATCH 4/8] feat: Add timestamp to S3 uploaded JDBC log filenames - Add automatic timestamp formatting (yyyyMMdd_HHmmss) to uploaded filenames - Example: snowflake_jdbc.log -> snowflake_jdbc_20250630_021500.log - Update README with timestamp feature documentation - Clarify s3_prefix as optional parameter in documentation - Improve S3 path handling for empty prefix cases --- README.md | 7 +-- build.gradle | 19 ++++++-- .../embulkPluginRuntime.lockfile | 42 ++++++++++++++++ .../embulk/output/SnowflakeOutputPlugin.java | 17 ++++--- .../org/embulk/output/s3/JdbcLogUploader.java | 48 +++++++++++++------ 5 files changed, 107 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index d87058c..fcd411a 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Snowflake output plugin for Embulk loads records to Snowflake. - **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none") - **upload_jdbc_log_to_s3**: enable automatic upload of JDBC driver logs to S3 when communication errors occur (boolean, default: false) - **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) -- **s3_prefix**: S3 key prefix for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) +- **s3_prefix**: S3 key prefix for JDBC log upload (string, optional - if empty, files are uploaded to bucket root) - **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true) - **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses default AWS credentials provider chain if not specified) - **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses default AWS credentials provider chain if not specified) @@ -89,7 +89,7 @@ out: # JDBC log upload configuration upload_jdbc_log_to_s3: true s3_bucket: your-log-bucket - s3_prefix: snowflake-jdbc-logs + s3_prefix: snowflake-jdbc-logs # Optional: omit to upload to bucket root s3_region: us-east-1 # Optional: Explicit AWS credentials (uses IAM role if not specified) @@ -107,7 +107,8 @@ out: - JDBC logs are only uploaded when communication errors occur during Snowflake operations - The plugin automatically finds the latest `snowflake_jdbc*.log` file in the system temp directory -- Logs are uploaded to `s3://{bucket}/{prefix}/{filename}` +- **Automatic timestamping**: Upload timestamp is automatically added to the filename (format: `yyyyMMdd_HHmmss`) + - Example: `snowflake_jdbc0.log.0` → `snowflake_jdbc0.log_20250630_021500.0` - If S3 upload fails, a warning is logged but the original error is still thrown - If required S3 configuration is missing, a warning is logged and log upload is skipped diff --git a/build.gradle b/build.gradle index 04a69f9..0ccba01 100644 --- a/build.gradle +++ b/build.gradle @@ -53,23 +53,36 @@ dependencies { compile "org.embulk:embulk-output-jdbc:0.10.2" compile "net.snowflake:snowflake-jdbc:3.13.26" - implementation platform("software.amazon.awssdk:bom:2.25.20") + compile platform("software.amazon.awssdk:bom:2.25.20") - implementation("software.amazon.awssdk:s3") { + compile("software.amazon.awssdk:s3") { exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" exclude group: "com.fasterxml.jackson.core", module: "jackson-core" exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" exclude group: "joda-time", module: "joda-time" exclude group: "commons-logging", module: "commons-logging" + exclude group: "org.slf4j", module: "slf4j-api" } - implementation("software.amazon.awssdk:regions") { + compile("software.amazon.awssdk:regions") { exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" exclude group: "com.fasterxml.jackson.core", module: "jackson-core" exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" exclude group: "joda-time", module: "joda-time" exclude group: "commons-logging", module: "commons-logging" + exclude group: "org.slf4j", module: "slf4j-api" } + + compile("software.amazon.awssdk:auth") { + exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" + exclude group: "com.fasterxml.jackson.core", module: "jackson-core" + exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" + exclude group: "joda-time", module: "joda-time" + exclude group: "commons-logging", module: "commons-logging" + exclude group: "org.slf4j", module: "slf4j-api" + } + + compile "commons-logging:commons-logging:1.2" } embulkPlugin { mainClass = "org.embulk.output.SnowflakeOutputPlugin" diff --git a/gradle/dependency-locks/embulkPluginRuntime.lockfile b/gradle/dependency-locks/embulkPluginRuntime.lockfile index 9ba3976..512bfa2 100644 --- a/gradle/dependency-locks/embulkPluginRuntime.lockfile +++ b/gradle/dependency-locks/embulkPluginRuntime.lockfile @@ -5,11 +5,53 @@ com.fasterxml.jackson.core:jackson-annotations:2.6.7 com.fasterxml.jackson.core:jackson-core:2.6.7 com.fasterxml.jackson.core:jackson-databind:2.6.7 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7 +commons-codec:commons-codec:1.15 +commons-logging:commons-logging:1.2 +io.netty:netty-buffer:4.1.108.Final +io.netty:netty-codec-http2:4.1.108.Final +io.netty:netty-codec-http:4.1.108.Final +io.netty:netty-codec:4.1.108.Final +io.netty:netty-common:4.1.108.Final +io.netty:netty-handler:4.1.108.Final +io.netty:netty-resolver:4.1.108.Final +io.netty:netty-transport-classes-epoll:4.1.108.Final +io.netty:netty-transport-native-unix-common:4.1.108.Final +io.netty:netty-transport:4.1.108.Final javax.validation:validation-api:1.1.0.Final net.snowflake:snowflake-jdbc:3.13.26 +org.apache.httpcomponents:httpclient:4.5.13 +org.apache.httpcomponents:httpcore:4.4.13 org.embulk:embulk-output-jdbc:0.10.2 org.embulk:embulk-util-config:0.3.0 org.embulk:embulk-util-json:0.1.1 org.embulk:embulk-util-retryhelper:0.8.2 org.embulk:embulk-util-rubytime:0.3.2 org.embulk:embulk-util-timestamp:0.2.1 +org.reactivestreams:reactive-streams:1.0.4 +software.amazon.awssdk:annotations:2.25.20 +software.amazon.awssdk:apache-client:2.25.20 +software.amazon.awssdk:arns:2.25.20 +software.amazon.awssdk:auth:2.25.20 +software.amazon.awssdk:aws-core:2.25.20 +software.amazon.awssdk:aws-query-protocol:2.25.20 +software.amazon.awssdk:aws-xml-protocol:2.25.20 +software.amazon.awssdk:checksums-spi:2.25.20 +software.amazon.awssdk:checksums:2.25.20 +software.amazon.awssdk:crt-core:2.25.20 +software.amazon.awssdk:endpoints-spi:2.25.20 +software.amazon.awssdk:http-auth-aws:2.25.20 +software.amazon.awssdk:http-auth-spi:2.25.20 +software.amazon.awssdk:http-auth:2.25.20 +software.amazon.awssdk:http-client-spi:2.25.20 +software.amazon.awssdk:identity-spi:2.25.20 +software.amazon.awssdk:json-utils:2.25.20 +software.amazon.awssdk:metrics-spi:2.25.20 +software.amazon.awssdk:netty-nio-client:2.25.20 +software.amazon.awssdk:profiles:2.25.20 +software.amazon.awssdk:protocol-core:2.25.20 +software.amazon.awssdk:regions:2.25.20 +software.amazon.awssdk:s3:2.25.20 +software.amazon.awssdk:sdk-core:2.25.20 +software.amazon.awssdk:third-party-jackson-core:2.25.20 +software.amazon.awssdk:utils:2.25.20 +software.amazon.eventstream:eventstream:1.0.1 diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index b2b92b0..3a8434a 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -250,6 +250,7 @@ public ConfigDiff transaction( try { snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); snowflakeCon.runCreateStage(stageIdentifier); + configDiff = super.transaction(config, schema, taskCount, control); if (t.getDeleteStage()) { runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); @@ -265,18 +266,22 @@ public ConfigDiff transaction( final Optional s3Region = t.getS3Region(); final Optional s3AccessKeyId = t.getS3AccessKeyId(); final Optional s3SecretAccessKey = t.getS3SecretAccessKey(); - if (!s3Bucket.isPresent() || !s3Prefix.isPresent() || !s3Region.isPresent()) { - logger.warn("s3_bucket, s3_prefix, and s3_region must be set when upload_jdbc_log_to_s3 is true"); + if (!s3Bucket.isPresent() || !s3Region.isPresent()) { + logger.warn("s3_bucket, and s3_region must be set when upload_jdbc_log_to_s3 is true"); } else { - try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(s3Bucket.get(), s3Prefix.get(), s3Region.get(), s3AccessKeyId.orElse(null), s3SecretAccessKey.orElse(null))) { - // snowflake_jdbc*.log で最新のファイルを探してアップロード + try (JdbcLogUploader jdbcLogUploader = + new JdbcLogUploader( + s3Bucket.get(), + s3Prefix.orElse(""), + s3Region.get(), + s3AccessKeyId.orElse(null), + s3SecretAccessKey.orElse(null))) { String tmpDir = System.getProperty("java.io.tmpdir", "/tmp"); File logDir = new File(tmpDir); File[] logFiles = logDir.listFiles( - (dir, name) -> name.startsWith("snowflake_jdbc") && name.endsWith(".log")); + (dir, name) -> name.startsWith("snowflake_jdbc")); if (logFiles != null && logFiles.length > 0) { - // 最終更新日時が新しいファイルを選択 Optional latest = Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified)); if (latest.isPresent()) { diff --git a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java index c085197..ba92a18 100644 --- a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java +++ b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java @@ -1,11 +1,13 @@ package org.embulk.output.s3; import java.io.File; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -19,24 +21,27 @@ public class JdbcLogUploader implements AutoCloseable { private final String prefix; private final Region region; - public JdbcLogUploader(String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) { + public JdbcLogUploader( + String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) { this.bucket = bucket; this.prefix = prefix; this.region = Region.of(region); - + if (accessKeyId != null && secretAccessKey != null) { // Use explicit credentials AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey); - this.s3Client = S3Client.builder() - .region(this.region) - .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) - .build(); + this.s3Client = + S3Client.builder() + .region(this.region) + .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) + .build(); } else { // Use default credentials provider (IAM role, environment variables, etc.) - this.s3Client = S3Client.builder() - .region(this.region) - .credentialsProvider(DefaultCredentialsProvider.create()) - .build(); + this.s3Client = + S3Client.builder() + .region(this.region) + .credentialsProvider(DefaultCredentialsProvider.create()) + .build(); } } @@ -46,15 +51,30 @@ public void uploadIfExists(File file) { return; } - String key = prefix + "/" + file.getName(); + // Add timestamp to filename + String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")); + String originalFileName = file.getName(); + String fileNameWithTimestamp; + + // Insert timestamp before file extension + int lastDotIndex = originalFileName.lastIndexOf('.'); + if (lastDotIndex > 0) { + String nameWithoutExt = originalFileName.substring(0, lastDotIndex); + String extension = originalFileName.substring(lastDotIndex); + fileNameWithTimestamp = nameWithoutExt + "_" + timestamp + extension; + } else { + fileNameWithTimestamp = originalFileName + "_" + timestamp; + } + + String key = prefix.isEmpty() ? fileNameWithTimestamp : prefix + "/" + fileNameWithTimestamp; try { PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(bucket).key(key).build(); s3Client.putObject(putObjectRequest, RequestBody.fromFile(file)); - logger.info("Uploaded {} to s3://{}/{}", file.getAbsolutePath(), bucket, key); + logger.info("Uploaded {}", file.getAbsolutePath()); } catch (Exception e) { - logger.error("Failed to upload {} to S3: {}", file.getAbsolutePath(), e.getMessage(), e); + logger.error("Failed to upload {}", file.getAbsolutePath()); } } From 91f7076cfb6ccf18ef59bdcef6c64ed692a186d9 Mon Sep 17 00:00:00 2001 From: kento_yoshida Date: Mon, 30 Jun 2025 03:05:53 +0000 Subject: [PATCH 5/8] style: Fix code formatting with spotless - Apply spotless formatting to Java files - Fix line breaks and spacing issues - Ensure code style compliance --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 4 +--- src/main/java/org/embulk/output/s3/JdbcLogUploader.java | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 3a8434a..6755206 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -278,9 +278,7 @@ public ConfigDiff transaction( s3SecretAccessKey.orElse(null))) { String tmpDir = System.getProperty("java.io.tmpdir", "/tmp"); File logDir = new File(tmpDir); - File[] logFiles = - logDir.listFiles( - (dir, name) -> name.startsWith("snowflake_jdbc")); + File[] logFiles = logDir.listFiles((dir, name) -> name.startsWith("snowflake_jdbc")); if (logFiles != null && logFiles.length > 0) { Optional latest = Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified)); diff --git a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java index ba92a18..c1e7fa6 100644 --- a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java +++ b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java @@ -55,7 +55,7 @@ public void uploadIfExists(File file) { String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")); String originalFileName = file.getName(); String fileNameWithTimestamp; - + // Insert timestamp before file extension int lastDotIndex = originalFileName.lastIndexOf('.'); if (lastDotIndex > 0) { @@ -65,7 +65,7 @@ public void uploadIfExists(File file) { } else { fileNameWithTimestamp = originalFileName + "_" + timestamp; } - + String key = prefix.isEmpty() ? fileNameWithTimestamp : prefix + "/" + fileNameWithTimestamp; try { PutObjectRequest putObjectRequest = From a313e67f8393c80d3cf38390e49954278a7ad8cc Mon Sep 17 00:00:00 2001 From: kentoyoshida Date: Mon, 30 Jun 2025 12:10:32 +0700 Subject: [PATCH 6/8] Update src/main/java/org/embulk/output/s3/JdbcLogUploader.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/org/embulk/output/s3/JdbcLogUploader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java index c1e7fa6..fd693a4 100644 --- a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java +++ b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java @@ -74,7 +74,7 @@ public void uploadIfExists(File file) { s3Client.putObject(putObjectRequest, RequestBody.fromFile(file)); logger.info("Uploaded {}", file.getAbsolutePath()); } catch (Exception e) { - logger.error("Failed to upload {}", file.getAbsolutePath()); + logger.error("Failed to upload {}", file.getAbsolutePath(), e); } } From 42d772e8e7f39a2c270aaaf1f04c95032eba3903 Mon Sep 17 00:00:00 2001 From: kentoyoshida Date: Mon, 30 Jun 2025 12:11:51 +0700 Subject: [PATCH 7/8] Update src/main/java/org/embulk/output/SnowflakeOutputPlugin.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 6755206..a3d8479 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -268,6 +268,8 @@ public ConfigDiff transaction( final Optional s3SecretAccessKey = t.getS3SecretAccessKey(); if (!s3Bucket.isPresent() || !s3Region.isPresent()) { logger.warn("s3_bucket, and s3_region must be set when upload_jdbc_log_to_s3 is true"); + } else if (s3AccessKeyId.isPresent() != s3SecretAccessKey.isPresent()) { + logger.warn("Both s3_access_key_id and s3_secret_access_key must be set together or omitted."); } else { try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader( From 1d856c0354fad222bba71033b42bf970f6226270 Mon Sep 17 00:00:00 2001 From: kento_yoshida Date: Mon, 30 Jun 2025 07:46:08 +0000 Subject: [PATCH 8/8] style: Fix code formatting with spotless --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index a3d8479..777736d 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -269,7 +269,8 @@ public ConfigDiff transaction( if (!s3Bucket.isPresent() || !s3Region.isPresent()) { logger.warn("s3_bucket, and s3_region must be set when upload_jdbc_log_to_s3 is true"); } else if (s3AccessKeyId.isPresent() != s3SecretAccessKey.isPresent()) { - logger.warn("Both s3_access_key_id and s3_secret_access_key must be set together or omitted."); + logger.warn( + "Both s3_access_key_id and s3_secret_access_key must be set together or omitted."); } else { try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(