Skip to content

Commit 19216a7

Browse files
committed
Merge remote-tracking branch 'remotes/origin/main' into feature/match_by_column_name
2 parents 2d4b9bf + 9cfaa02 commit 19216a7

File tree

3 files changed

+42
-29
lines changed

3 files changed

+42
-29
lines changed

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
1212
import org.embulk.config.ConfigDiff;
1313
import org.embulk.config.ConfigException;
14+
import org.embulk.config.ConfigSource;
1415
import org.embulk.config.TaskSource;
1516
import org.embulk.output.jdbc.*;
1617
import org.embulk.output.snowflake.PrivateKeyReader;
@@ -27,8 +28,6 @@
2728
import org.embulk.util.config.ConfigDefault;
2829

2930
public class SnowflakeOutputPlugin extends AbstractJdbcOutputPlugin {
30-
private StageIdentifier stageIdentifier;
31-
3231
public interface SnowflakePluginTask extends PluginTask {
3332
@Config("driver_path")
3433
@ConfigDefault("null")
@@ -79,6 +78,10 @@ public interface SnowflakePluginTask extends PluginTask {
7978
@ConfigDefault("true")
8079
public boolean getEmtpyFieldAsNull();
8180

81+
@Config("delete_stage_on_error")
82+
@ConfigDefault("false")
83+
public boolean getDeleteStageOnError();
84+
8285
@Config("match_by_column_name")
8386
@ConfigDefault("\"none\"")
8487
public MatchByColumnName getMatchByColumnName();
@@ -188,25 +191,39 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
188191
}
189192

190193
@Override
191-
public ConfigDiff resume(
192-
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
193-
throw new UnsupportedOperationException("snowflake output plugin does not support resuming");
194-
}
195-
196-
@Override
197-
protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount)
198-
throws SQLException {
199-
super.doCommit(con, task, taskCount);
200-
SnowflakeOutputConnection snowflakeCon = (SnowflakeOutputConnection) con;
201-
194+
public ConfigDiff transaction(
195+
ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) {
196+
PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass());
202197
SnowflakePluginTask t = (SnowflakePluginTask) task;
203-
if (this.stageIdentifier == null) {
204-
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
198+
StageIdentifier stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
199+
ConfigDiff configDiff;
200+
SnowflakeOutputConnection snowflakeCon = null;
201+
202+
try {
203+
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
204+
snowflakeCon.runCreateStage(stageIdentifier);
205+
configDiff = super.transaction(config, schema, taskCount, control);
206+
if (t.getDeleteStage()) {
207+
snowflakeCon.runDropStage(stageIdentifier);
208+
}
209+
} catch (Exception e) {
210+
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
211+
try {
212+
snowflakeCon.runDropStage(stageIdentifier);
213+
} catch (SQLException ex) {
214+
throw new RuntimeException(ex);
215+
}
216+
}
217+
throw new RuntimeException(e);
205218
}
206219

207-
if (t.getDeleteStage()) {
208-
snowflakeCon.runDropStage(this.stageIdentifier);
209-
}
220+
return configDiff;
221+
}
222+
223+
@Override
224+
public ConfigDiff resume(
225+
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
226+
throw new UnsupportedOperationException("snowflake output plugin does not support resuming");
210227
}
211228

212229
@Override
@@ -255,20 +272,11 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
255272
throw new UnsupportedOperationException(
256273
"Snowflake output plugin doesn't support 'merge_direct' mode.");
257274
}
258-
259-
SnowflakePluginTask t = (SnowflakePluginTask) task;
260-
// TODO: put some where executes once
261-
if (this.stageIdentifier == null) {
262-
SnowflakeOutputConnection snowflakeCon =
263-
(SnowflakeOutputConnection) getConnector(task, true).connect(true);
264-
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
265-
snowflakeCon.runCreateStage(this.stageIdentifier);
266-
}
267275
SnowflakePluginTask pluginTask = (SnowflakePluginTask) task;
268276

269277
return new SnowflakeCopyBatchInsert(
270278
getConnector(task, true),
271-
this.stageIdentifier,
279+
StageIdentifierHolder.getStageIdentifier(pluginTask),
272280
pluginTask.getCopyIntoTableColumnNames(),
273281
pluginTask.getCopyIntoCSVColumnNumbers(),
274282
false,

src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public SnowflakeCopyBatchInsert(
6969
@Override
7070
public void prepare(TableIdentifier loadTable, JdbcSchema insertSchema) throws SQLException {
7171
this.connection = (SnowflakeOutputConnection) connector.connect(true);
72-
this.connection.runCreateStage(stageIdentifier);
7372
this.tableIdentifier = loadTable;
7473
}
7574

src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@
1111
import org.embulk.output.jdbc.JdbcSchema;
1212
import org.embulk.output.jdbc.MergeConfig;
1313
import org.embulk.output.jdbc.TableIdentifier;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1416

1517
public class SnowflakeOutputConnection extends JdbcOutputConnection {
18+
private final Logger logger = LoggerFactory.getLogger(SnowflakeOutputConnection.class);
19+
1620
public SnowflakeOutputConnection(Connection connection) throws SQLException {
1721
super(connection, null);
1822
}
@@ -45,11 +49,13 @@ public void runCopy(
4549
public void runCreateStage(StageIdentifier stageIdentifier) throws SQLException {
4650
String sql = buildCreateStageSQL(stageIdentifier);
4751
runUpdate(sql);
52+
logger.info("SQL: {}", sql);
4853
}
4954

5055
public void runDropStage(StageIdentifier stageIdentifier) throws SQLException {
5156
String sql = buildDropStageSQL(stageIdentifier);
5257
runUpdate(sql);
58+
logger.info("SQL: {}", sql);
5359
}
5460

5561
public void runUploadFile(

0 commit comments

Comments
 (0)