Skip to content

Commit 02a56c3

Browse files
committed
Merge remote-tracking branch 'origin/main' into feature/match_by_column_name
2 parents d552f67 + 60f66fa commit 02a56c3

File tree

5 files changed

+47
-11
lines changed

5 files changed

+47
-11
lines changed

.github/workflows/main.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,20 @@ on:
1010
branches:
1111
- 'main'
1212
types: [opened, synchronize]
13+
pull_request_target:
14+
branches:
15+
- 'main'
16+
types: [labeled]
1317

1418
jobs:
1519
main:
1620
runs-on: ubuntu-latest
21+
if: >
22+
${{
23+
github.event_name == 'pull_request' ||
24+
(github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'safe to test')) ||
25+
startsWith(github.ref, 'refs/tags/')
26+
}}
1727
permissions:
1828
packages: write
1929
contents: read

README.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ Snowflake output plugin for Embulk loads records to Snowflake.
1313
- **host**: database host name (string, required)
1414
- **user**: database login user name (string, required)
1515
- **password**: database login password (string, default: "")
16-
- **privateKey**: database login using key-pair authentication(string, default: ""). This authentication method requires a 2048-bit (minimum) RSA key pair.
16+
- **privateKey**: database login using key-pair authentication(string, default: ""). This authentication method requires a 2048-bit (minimum) RSA key pair.
17+
- **private_key_passphrase**: passphrase for private_key (string, default: "")
1718
- **warehouse**: destination warehouse name (string, required)
1819
- **database**: destination database name (string, required)
1920
- **schema**: destination schema name (string, default: "public")
2021
- **table**: destination table name (string, required)
22+
- **role**: role to execute queries (string, default: "")
2123
- **retry_limit**: max retry count for database operations (integer, default: 12). When intermediate table to create already created by another process, this plugin will retry with another table name to avoid collision.
2224
- **retry_wait**: initial retry wait time in milliseconds (integer, default: 1000 (1 second))
2325
- **max_retry_wait**: upper limit of retry wait, which will be doubled at every retry (integer, default: 1800000 (30 minutes))
@@ -60,10 +62,6 @@ Snowflake output plugin for Embulk loads records to Snowflake.
6062

6163
## Build
6264

63-
## Not implement
64-
- Passphrase for `privateKey` in key-pair authentication.
65-
66-
6765
```
6866
$ ./gradlew gem # -t to watch change of files and rebuild continuously
6967
```

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import java.sql.Types;
88
import java.util.*;
99
import java.util.function.BiFunction;
10+
import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException;
11+
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
1012
import org.embulk.config.ConfigDiff;
1113
import org.embulk.config.ConfigException;
1214
import org.embulk.config.TaskSource;
@@ -47,6 +49,10 @@ public interface SnowflakePluginTask extends PluginTask {
4749
@ConfigDefault("\"\"")
4850
String getPrivateKey();
4951

52+
@Config("private_key_passphrase")
53+
@ConfigDefault("\"\"")
54+
String getPrivateKeyPassphrase();
55+
5056
@Config("database")
5157
public String getDatabase();
5258

@@ -57,6 +63,10 @@ public interface SnowflakePluginTask extends PluginTask {
5763
@ConfigDefault("\"public\"")
5864
public String getSchema();
5965

66+
@Config("role")
67+
@ConfigDefault("\"\"")
68+
public String getRole();
69+
6070
@Config("delete_stage")
6171
@ConfigDefault("false")
6272
public boolean getDeleteStage();
@@ -146,17 +156,21 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
146156
props.setProperty("password", t.getPassword());
147157
} else if (!t.getPrivateKey().isEmpty()) {
148158
try {
149-
props.put("privateKey", PrivateKeyReader.get(t.getPrivateKey()));
150-
} catch (IOException e) {
151-
// Because the source of newConnection definition does not assume IOException, change it to
152-
// ConfigException.
159+
props.put(
160+
"privateKey", PrivateKeyReader.get(t.getPrivateKey(), t.getPrivateKeyPassphrase()));
161+
} catch (IOException | OperatorCreationException | PKCSException e) {
162+
// Since this method is not allowed to throw any checked exception,
163+
// wrap it with ConfigException, which is unchecked.
153164
throw new ConfigException(e);
154165
}
155166
}
156167

157168
props.setProperty("warehouse", t.getWarehouse());
158169
props.setProperty("db", t.getDatabase());
159170
props.setProperty("schema", t.getSchema());
171+
if (!t.getRole().isEmpty()) {
172+
props.setProperty("role", t.getRole());
173+
}
160174

161175
// When CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX is false (default),
162176
// getMetaData().getColumns() returns columns of the tables which table name is

src/main/java/org/embulk/output/snowflake/PrivateKeyReader.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,31 @@
88
import net.snowflake.client.jdbc.internal.org.bouncycastle.jce.provider.BouncyCastleProvider;
99
import net.snowflake.client.jdbc.internal.org.bouncycastle.openssl.PEMParser;
1010
import net.snowflake.client.jdbc.internal.org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
11+
import net.snowflake.client.jdbc.internal.org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
12+
import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.InputDecryptorProvider;
13+
import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException;
14+
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
15+
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
1116

1217
// ref:
1318
// https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-configure#privatekey-property-in-connection-properties
1419
public class PrivateKeyReader {
15-
public static PrivateKey get(String pemString) throws IOException {
20+
public static PrivateKey get(String pemString, String passphrase)
21+
throws IOException, OperatorCreationException, PKCSException {
1622
Security.addProvider(new BouncyCastleProvider());
1723
PEMParser pemParser = new PEMParser(new StringReader(pemString));
1824
Object pemObject = pemParser.readObject();
1925
pemParser.close();
2026

2127
PrivateKeyInfo privateKeyInfo;
22-
if (pemObject instanceof PrivateKeyInfo) {
28+
if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
29+
// Handle the case where the private key is encrypted.
30+
PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo =
31+
(PKCS8EncryptedPrivateKeyInfo) pemObject;
32+
InputDecryptorProvider pkcs8Prov =
33+
new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
34+
privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
35+
} else if (pemObject instanceof PrivateKeyInfo) {
2336
privateKeyInfo = (PrivateKeyInfo) pemObject;
2437
} else {
2538
throw new IllegalArgumentException("Provided PEM does not contain a valid Private Key");

src/test/java/org/embulk/output/snowflake/TestSnowflakeOutputPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ public void testConfigDefault() throws Exception {
208208
assertEquals("", task.getUser());
209209
assertEquals("", task.getPassword());
210210
assertEquals("public", task.getSchema());
211+
assertEquals("", task.getRole());
211212
assertEquals(false, task.getDeleteStage());
212213
}
213214

0 commit comments

Comments
 (0)