|
23 | 23 | import org.embulk.util.config.ConfigDefault; |
24 | 24 |
|
25 | 25 | public class SnowflakeOutputPlugin extends AbstractJdbcOutputPlugin { |
26 | | - private StageIdentifier stageIdentifier; |
27 | | - |
28 | 26 | public interface SnowflakePluginTask extends PluginTask { |
29 | 27 | @Config("driver_path") |
30 | 28 | @ConfigDefault("null") |
@@ -139,21 +137,21 @@ public ConfigDiff transaction( |
139 | 137 | ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) { |
140 | 138 | PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass()); |
141 | 139 | SnowflakePluginTask t = (SnowflakePluginTask) task; |
142 | | - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); |
| 140 | + StageIdentifier stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); |
143 | 141 | ConfigDiff configDiff; |
144 | 142 | SnowflakeOutputConnection snowflakeCon = null; |
145 | 143 |
|
146 | 144 | try { |
147 | 145 | snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); |
148 | | - snowflakeCon.runCreateStage(this.stageIdentifier); |
| 146 | + snowflakeCon.runCreateStage(stageIdentifier); |
149 | 147 | configDiff = super.transaction(config, schema, taskCount, control); |
150 | 148 | if (t.getDeleteStage()) { |
151 | | - snowflakeCon.runDropStage(this.stageIdentifier); |
| 149 | + snowflakeCon.runDropStage(stageIdentifier); |
152 | 150 | } |
153 | 151 | } catch (Exception e) { |
154 | 152 | if (t.getDeleteStage() && t.getDeleteStageOnError()) { |
155 | 153 | try { |
156 | | - snowflakeCon.runDropStage(this.stageIdentifier); |
| 154 | + snowflakeCon.runDropStage(stageIdentifier); |
157 | 155 | } catch (SQLException ex) { |
158 | 156 | throw new RuntimeException(ex); |
159 | 157 | } |
@@ -185,11 +183,10 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg |
185 | 183 | "Snowflake output plugin doesn't support 'merge_direct' mode."); |
186 | 184 | } |
187 | 185 | SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; |
188 | | - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(pluginTask); |
189 | 186 |
|
190 | 187 | return new SnowflakeCopyBatchInsert( |
191 | 188 | getConnector(task, true), |
192 | | - this.stageIdentifier, |
| 189 | + StageIdentifierHolder.getStageIdentifier(pluginTask), |
193 | 190 | false, |
194 | 191 | pluginTask.getMaxUploadRetries(), |
195 | 192 | pluginTask.getEmtpyFieldAsNull()); |
|
0 commit comments