diff --git a/README.md b/README.md index 5d7b3de..fcd411a 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,12 @@ Snowflake output plugin for Embulk loads records to Snowflake. - **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values) - **batch_size**: size of a single batch insert (integer, default: 16777216) - **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none") +- **upload_jdbc_log_to_s3**: enable automatic upload of JDBC driver logs to S3 when communication errors occur (boolean, default: false) +- **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true) +- **s3_prefix**: S3 key prefix for JDBC log upload (string, optional - if empty, files are uploaded to bucket root) +- **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true) +- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses default AWS credentials provider chain if not specified) +- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses default AWS credentials provider chain if not specified) - **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`) - **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column. - **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp) @@ -62,6 +68,50 @@ Snowflake output plugin for Embulk loads records to Snowflake. * Transactional: Yes. * Resumable: No. +## JDBC Log Upload to S3 + +This plugin supports automatic upload of JDBC driver logs to S3 when communication errors occur. This feature is useful for debugging connection issues with Snowflake. + +### Configuration Example + +```yaml +out: + type: snowflake + host: your-account.snowflakecomputing.com + user: your_user + password: your_password + warehouse: your_warehouse + database: your_database + schema: your_schema + table: your_table + mode: insert + + # JDBC log upload configuration + upload_jdbc_log_to_s3: true + s3_bucket: your-log-bucket + s3_prefix: snowflake-jdbc-logs # Optional: omit to upload to bucket root + s3_region: us-east-1 + + # Optional: Explicit AWS credentials (uses IAM role if not specified) + s3_access_key_id: YOUR_ACCESS_KEY_ID + s3_secret_access_key: YOUR_SECRET_ACCESS_KEY +``` + +### Authentication Methods + +1. **IAM Role (Recommended)**: Leave `s3_access_key_id` and `s3_secret_access_key` unspecified. The plugin will use the default AWS credentials provider chain (IAM role, environment variables, etc.). + +2. **Explicit Credentials**: Specify both `s3_access_key_id` and `s3_secret_access_key` for explicit authentication. + +### Behavior + +- JDBC logs are only uploaded when communication errors occur during Snowflake operations +- The plugin automatically finds the latest `snowflake_jdbc*.log` file in the system temp directory +- **Automatic timestamping**: Upload timestamp is automatically added to the filename (format: `yyyyMMdd_HHmmss`) + - Example: `snowflake_jdbc0.log.0` → `snowflake_jdbc0.log_20250630_021500.0` +- If S3 upload fails, a warning is logged but the original error is still thrown +- If required S3 configuration is missing, a warning is logged and log upload is skipped + ## Build ``` diff --git a/build.gradle b/build.gradle index e308b73..0ccba01 100644 --- a/build.gradle +++ b/build.gradle @@ -52,6 +52,37 @@ dependencies { compile "org.embulk:embulk-output-jdbc:0.10.2" compile "net.snowflake:snowflake-jdbc:3.13.26" + + compile platform("software.amazon.awssdk:bom:2.25.20") + + compile("software.amazon.awssdk:s3") { + exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" + exclude group: "com.fasterxml.jackson.core", module: "jackson-core" + exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" + exclude group: "joda-time", module: "joda-time" + exclude group: "commons-logging", module: "commons-logging" + exclude group: "org.slf4j", module: "slf4j-api" + } + + compile("software.amazon.awssdk:regions") { + exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" + exclude group: "com.fasterxml.jackson.core", module: "jackson-core" + exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" + exclude group: "joda-time", module: "joda-time" + exclude group: "commons-logging", module: "commons-logging" + exclude group: "org.slf4j", module: "slf4j-api" + } + + compile("software.amazon.awssdk:auth") { + exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" + exclude group: "com.fasterxml.jackson.core", module: "jackson-core" + exclude group: "com.fasterxml.jackson.core", module: "jackson-databind" + exclude group: "joda-time", module: "joda-time" + exclude group: "commons-logging", module: "commons-logging" + exclude group: "org.slf4j", module: "slf4j-api" + } + + compile "commons-logging:commons-logging:1.2" } embulkPlugin { mainClass = "org.embulk.output.SnowflakeOutputPlugin" diff --git a/gradle/dependency-locks/embulkPluginRuntime.lockfile b/gradle/dependency-locks/embulkPluginRuntime.lockfile index 9ba3976..512bfa2 100644 --- a/gradle/dependency-locks/embulkPluginRuntime.lockfile +++ b/gradle/dependency-locks/embulkPluginRuntime.lockfile @@ -5,11 +5,53 @@ com.fasterxml.jackson.core:jackson-annotations:2.6.7 com.fasterxml.jackson.core:jackson-core:2.6.7 com.fasterxml.jackson.core:jackson-databind:2.6.7 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7 +commons-codec:commons-codec:1.15 +commons-logging:commons-logging:1.2 +io.netty:netty-buffer:4.1.108.Final +io.netty:netty-codec-http2:4.1.108.Final +io.netty:netty-codec-http:4.1.108.Final +io.netty:netty-codec:4.1.108.Final +io.netty:netty-common:4.1.108.Final +io.netty:netty-handler:4.1.108.Final +io.netty:netty-resolver:4.1.108.Final +io.netty:netty-transport-classes-epoll:4.1.108.Final +io.netty:netty-transport-native-unix-common:4.1.108.Final +io.netty:netty-transport:4.1.108.Final javax.validation:validation-api:1.1.0.Final net.snowflake:snowflake-jdbc:3.13.26 +org.apache.httpcomponents:httpclient:4.5.13 +org.apache.httpcomponents:httpcore:4.4.13 org.embulk:embulk-output-jdbc:0.10.2 org.embulk:embulk-util-config:0.3.0 org.embulk:embulk-util-json:0.1.1 org.embulk:embulk-util-retryhelper:0.8.2 org.embulk:embulk-util-rubytime:0.3.2 org.embulk:embulk-util-timestamp:0.2.1 +org.reactivestreams:reactive-streams:1.0.4 +software.amazon.awssdk:annotations:2.25.20 +software.amazon.awssdk:apache-client:2.25.20 +software.amazon.awssdk:arns:2.25.20 +software.amazon.awssdk:auth:2.25.20 +software.amazon.awssdk:aws-core:2.25.20 +software.amazon.awssdk:aws-query-protocol:2.25.20 +software.amazon.awssdk:aws-xml-protocol:2.25.20 +software.amazon.awssdk:checksums-spi:2.25.20 +software.amazon.awssdk:checksums:2.25.20 +software.amazon.awssdk:crt-core:2.25.20 +software.amazon.awssdk:endpoints-spi:2.25.20 +software.amazon.awssdk:http-auth-aws:2.25.20 +software.amazon.awssdk:http-auth-spi:2.25.20 +software.amazon.awssdk:http-auth:2.25.20 +software.amazon.awssdk:http-client-spi:2.25.20 +software.amazon.awssdk:identity-spi:2.25.20 +software.amazon.awssdk:json-utils:2.25.20 +software.amazon.awssdk:metrics-spi:2.25.20 +software.amazon.awssdk:netty-nio-client:2.25.20 +software.amazon.awssdk:profiles:2.25.20 +software.amazon.awssdk:protocol-core:2.25.20 +software.amazon.awssdk:regions:2.25.20 +software.amazon.awssdk:s3:2.25.20 +software.amazon.awssdk:sdk-core:2.25.20 +software.amazon.awssdk:third-party-jackson-core:2.25.20 +software.amazon.awssdk:utils:2.25.20 +software.amazon.eventstream:eventstream:1.0.1 diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 9c01d97..777736d 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import java.io.File; import java.io.IOException; import java.sql.SQLException; import java.sql.Types; @@ -15,6 +16,7 @@ import org.embulk.config.ConfigSource; import org.embulk.config.TaskSource; import org.embulk.output.jdbc.*; +import org.embulk.output.s3.JdbcLogUploader; import org.embulk.output.snowflake.PrivateKeyReader; import org.embulk.output.snowflake.SnowflakeCopyBatchInsert; import org.embulk.output.snowflake.SnowflakeOutputConnection; @@ -91,6 +93,30 @@ public interface SnowflakePluginTask extends PluginTask { @ConfigDefault("\"none\"") public MatchByColumnName getMatchByColumnName(); + @Config("upload_jdbc_log_to_s3") + @ConfigDefault("false") + public boolean getUploadJdbcLogToS3(); + + @Config("s3_bucket") + @ConfigDefault("null") + public Optional getS3Bucket(); + + @Config("s3_prefix") + @ConfigDefault("null") + public Optional getS3Prefix(); + + @Config("s3_region") + @ConfigDefault("null") + public Optional getS3Region(); + + @Config("s3_access_key_id") + @ConfigDefault("null") + public Optional getS3AccessKeyId(); + + @Config("s3_secret_access_key") + @ConfigDefault("null") + public Optional getS3SecretAccessKey(); + public void setCopyIntoTableColumnNames(String[] columnNames); public String[] getCopyIntoTableColumnNames(); @@ -139,6 +165,9 @@ public static MatchByColumnName fromString(String value) { private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115; private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195; + private static final String ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE = + "JDBC driver encountered communication error"; + @Override protected Class getTaskClass() { return SnowflakePluginTask.class; @@ -198,6 +227,10 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet props.setProperty("CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX", "true"); props.setProperty("MULTI_STATEMENT_COUNT", "0"); + if (t.getUploadJdbcLogToS3()) { + props.setProperty("tracing", "ALL"); + } + props.putAll(t.getOptions()); logConnectionProperties(url, props); @@ -217,11 +250,53 @@ public ConfigDiff transaction( try { snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); snowflakeCon.runCreateStage(stageIdentifier); + configDiff = super.transaction(config, schema, taskCount, control); if (t.getDeleteStage()) { runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); } } catch (Exception e) { + if (e instanceof SQLException) { + String message = e.getMessage(); + if (message != null + && message.contains(ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE) + && t.getUploadJdbcLogToS3() == true) { + final Optional s3Bucket = t.getS3Bucket(); + final Optional s3Prefix = t.getS3Prefix(); + final Optional s3Region = t.getS3Region(); + final Optional s3AccessKeyId = t.getS3AccessKeyId(); + final Optional s3SecretAccessKey = t.getS3SecretAccessKey(); + if (!s3Bucket.isPresent() || !s3Region.isPresent()) { + logger.warn("s3_bucket, and s3_region must be set when upload_jdbc_log_to_s3 is true"); + } else if (s3AccessKeyId.isPresent() != s3SecretAccessKey.isPresent()) { + logger.warn( + "Both s3_access_key_id and s3_secret_access_key must be set together or omitted."); + } else { + try (JdbcLogUploader jdbcLogUploader = + new JdbcLogUploader( + s3Bucket.get(), + s3Prefix.orElse(""), + s3Region.get(), + s3AccessKeyId.orElse(null), + s3SecretAccessKey.orElse(null))) { + String tmpDir = System.getProperty("java.io.tmpdir", "/tmp"); + File logDir = new File(tmpDir); + File[] logFiles = logDir.listFiles((dir, name) -> name.startsWith("snowflake_jdbc")); + if (logFiles != null && logFiles.length > 0) { + Optional latest = + Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified)); + if (latest.isPresent()) { + jdbcLogUploader.uploadIfExists(latest.get()); + } + } else { + logger.warn("No snowflake_jdbc*.log file found in {} for upload", tmpDir); + } + } catch (Exception uploadException) { + logger.warn("Failed to upload JDBC log to S3: {}", uploadException.getMessage()); + } + } + } + } if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); diff --git a/src/main/java/org/embulk/output/s3/JdbcLogUploader.java b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java new file mode 100644 index 0000000..fd693a4 --- /dev/null +++ b/src/main/java/org/embulk/output/s3/JdbcLogUploader.java @@ -0,0 +1,87 @@ +package org.embulk.output.s3; + +import java.io.File; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +public class JdbcLogUploader implements AutoCloseable { + private final Logger logger = LoggerFactory.getLogger(JdbcLogUploader.class); + + private final S3Client s3Client; + private final String bucket; + private final String prefix; + private final Region region; + + public JdbcLogUploader( + String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) { + this.bucket = bucket; + this.prefix = prefix; + this.region = Region.of(region); + + if (accessKeyId != null && secretAccessKey != null) { + // Use explicit credentials + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey); + this.s3Client = + S3Client.builder() + .region(this.region) + .credentialsProvider(StaticCredentialsProvider.create(awsCreds)) + .build(); + } else { + // Use default credentials provider (IAM role, environment variables, etc.) + this.s3Client = + S3Client.builder() + .region(this.region) + .credentialsProvider(DefaultCredentialsProvider.create()) + .build(); + } + } + + public void uploadIfExists(File file) { + if (!file.exists()) { + logger.warn("File not found: {}", file.getAbsolutePath()); + return; + } + + // Add timestamp to filename + String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")); + String originalFileName = file.getName(); + String fileNameWithTimestamp; + + // Insert timestamp before file extension + int lastDotIndex = originalFileName.lastIndexOf('.'); + if (lastDotIndex > 0) { + String nameWithoutExt = originalFileName.substring(0, lastDotIndex); + String extension = originalFileName.substring(lastDotIndex); + fileNameWithTimestamp = nameWithoutExt + "_" + timestamp + extension; + } else { + fileNameWithTimestamp = originalFileName + "_" + timestamp; + } + + String key = prefix.isEmpty() ? fileNameWithTimestamp : prefix + "/" + fileNameWithTimestamp; + try { + PutObjectRequest putObjectRequest = + PutObjectRequest.builder().bucket(bucket).key(key).build(); + + s3Client.putObject(putObjectRequest, RequestBody.fromFile(file)); + logger.info("Uploaded {}", file.getAbsolutePath()); + } catch (Exception e) { + logger.error("Failed to upload {}", file.getAbsolutePath(), e); + } + } + + @Override + public void close() { + if (s3Client != null) { + s3Client.close(); + } + } +}