Skip to content

Commit 9cfaa02

Browse files
authored
Merge pull request #72 from trocco-io/19356-drop-snowflake-stage
Delete tmp stage even on error and Add delete_stage_on_error option
2 parents 60f66fa + dd8d24a commit 9cfaa02

File tree

3 files changed

+42
-29
lines changed

3 files changed

+42
-29
lines changed

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
99
import org.embulk.config.ConfigDiff;
1010
import org.embulk.config.ConfigException;
11+
import org.embulk.config.ConfigSource;
1112
import org.embulk.config.TaskSource;
1213
import org.embulk.output.jdbc.*;
1314
import org.embulk.output.snowflake.PrivateKeyReader;
@@ -24,8 +25,6 @@
2425
import org.embulk.util.config.ConfigDefault;
2526

2627
public class SnowflakeOutputPlugin extends AbstractJdbcOutputPlugin {
27-
private StageIdentifier stageIdentifier;
28-
2928
public interface SnowflakePluginTask extends PluginTask {
3029
@Config("driver_path")
3130
@ConfigDefault("null")
@@ -75,6 +74,10 @@ public interface SnowflakePluginTask extends PluginTask {
7574
@Config("empty_field_as_null")
7675
@ConfigDefault("true")
7776
public boolean getEmtpyFieldAsNull();
77+
78+
@Config("delete_stage_on_error")
79+
@ConfigDefault("false")
80+
public boolean getDeleteStageOnError();
7881
}
7982

8083
@Override
@@ -144,25 +147,39 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
144147
}
145148

146149
@Override
147-
public ConfigDiff resume(
148-
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
149-
throw new UnsupportedOperationException("snowflake output plugin does not support resuming");
150-
}
151-
152-
@Override
153-
protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount)
154-
throws SQLException {
155-
super.doCommit(con, task, taskCount);
156-
SnowflakeOutputConnection snowflakeCon = (SnowflakeOutputConnection) con;
157-
150+
public ConfigDiff transaction(
151+
ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) {
152+
PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass());
158153
SnowflakePluginTask t = (SnowflakePluginTask) task;
159-
if (this.stageIdentifier == null) {
160-
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
154+
StageIdentifier stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
155+
ConfigDiff configDiff;
156+
SnowflakeOutputConnection snowflakeCon = null;
157+
158+
try {
159+
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
160+
snowflakeCon.runCreateStage(stageIdentifier);
161+
configDiff = super.transaction(config, schema, taskCount, control);
162+
if (t.getDeleteStage()) {
163+
snowflakeCon.runDropStage(stageIdentifier);
164+
}
165+
} catch (Exception e) {
166+
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
167+
try {
168+
snowflakeCon.runDropStage(stageIdentifier);
169+
} catch (SQLException ex) {
170+
throw new RuntimeException(ex);
171+
}
172+
}
173+
throw new RuntimeException(e);
161174
}
162175

163-
if (t.getDeleteStage()) {
164-
snowflakeCon.runDropStage(this.stageIdentifier);
165-
}
176+
return configDiff;
177+
}
178+
179+
@Override
180+
public ConfigDiff resume(
181+
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
182+
throw new UnsupportedOperationException("snowflake output plugin does not support resuming");
166183
}
167184

168185
@Override
@@ -179,20 +196,11 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
179196
throw new UnsupportedOperationException(
180197
"Snowflake output plugin doesn't support 'merge_direct' mode.");
181198
}
182-
183-
SnowflakePluginTask t = (SnowflakePluginTask) task;
184-
// TODO: put some where executes once
185-
if (this.stageIdentifier == null) {
186-
SnowflakeOutputConnection snowflakeCon =
187-
(SnowflakeOutputConnection) getConnector(task, true).connect(true);
188-
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
189-
snowflakeCon.runCreateStage(this.stageIdentifier);
190-
}
191199
SnowflakePluginTask pluginTask = (SnowflakePluginTask) task;
192200

193201
return new SnowflakeCopyBatchInsert(
194202
getConnector(task, true),
195-
this.stageIdentifier,
203+
StageIdentifierHolder.getStageIdentifier(pluginTask),
196204
false,
197205
pluginTask.getMaxUploadRetries(),
198206
pluginTask.getEmtpyFieldAsNull());

src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public SnowflakeCopyBatchInsert(
6161
@Override
6262
public void prepare(TableIdentifier loadTable, JdbcSchema insertSchema) throws SQLException {
6363
this.connection = (SnowflakeOutputConnection) connector.connect(true);
64-
this.connection.runCreateStage(stageIdentifier);
6564
this.tableIdentifier = loadTable;
6665
}
6766

src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@
1111
import org.embulk.output.jdbc.JdbcSchema;
1212
import org.embulk.output.jdbc.MergeConfig;
1313
import org.embulk.output.jdbc.TableIdentifier;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1416

1517
public class SnowflakeOutputConnection extends JdbcOutputConnection {
18+
private final Logger logger = LoggerFactory.getLogger(SnowflakeOutputConnection.class);
19+
1620
public SnowflakeOutputConnection(Connection connection) throws SQLException {
1721
super(connection, null);
1822
}
@@ -32,11 +36,13 @@ public void runCopy(
3236
public void runCreateStage(StageIdentifier stageIdentifier) throws SQLException {
3337
String sql = buildCreateStageSQL(stageIdentifier);
3438
runUpdate(sql);
39+
logger.info("SQL: {}", sql);
3540
}
3641

3742
public void runDropStage(StageIdentifier stageIdentifier) throws SQLException {
3843
String sql = buildDropStageSQL(stageIdentifier);
3944
runUpdate(sql);
45+
logger.info("SQL: {}", sql);
4046
}
4147

4248
public void runUploadFile(

0 commit comments

Comments
 (0)