Skip to content

Commit 1b7e0a9

Browse files
committed
delete files in advance
1 parent d4a9f67 commit 1b7e0a9

File tree

4 files changed

+195
-0
lines changed

4 files changed

+195
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Google Cloud Storage output plugin for [Embulk](https://github.com/embulk/embulk
2424
- **json_keyfile** fullpath of json_key (string, required when auth_method is json_key)
2525
- **application_name**: Application name, anything you like (string, optional, default value is "embulk-output-gcs")
2626
- **max_connection_retry**: Number of connection retries to GCS (number, default value is 10)
27+
- **delete_in_advance**: Delete Bucket/Prefix matched files in advance (boolean, default value is false)
2728

2829
## Example
2930

src/main/java/org/embulk/output/GcsOutputPlugin.java

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.embulk.output;
22

33
import com.google.api.services.storage.Storage;
4+
import com.google.api.services.storage.model.StorageObject;
5+
import com.google.api.services.storage.model.Objects;
46
import com.google.common.base.Throwables;
57
import org.embulk.config.ConfigDiff;
68
import org.embulk.config.ConfigException;
@@ -11,15 +13,25 @@
1113
import org.embulk.spi.FileOutputPlugin;
1214
import org.embulk.spi.TransactionalFileOutput;
1315
import org.embulk.spi.unit.LocalFile;
16+
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
17+
import org.embulk.spi.util.RetryExecutor.Retryable;
18+
import org.slf4j.Logger;
1419

1520
import java.io.IOException;
21+
import java.io.InterruptedIOException;
1622
import java.security.GeneralSecurityException;
23+
import java.util.ArrayList;
24+
import java.util.LinkedList;
1725
import java.util.List;
1826
import java.util.Optional;
1927
import java.util.function.Function;
2028

29+
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
30+
2131
public class GcsOutputPlugin implements FileOutputPlugin
2232
{
33+
private static final Logger logger = Exec.getLogger(GcsOutputPlugin.class);
34+
2335
@Override
2436
public ConfigDiff transaction(ConfigSource config,
2537
int taskCount,
@@ -50,6 +62,10 @@ else if (task.getAuthMethod().getString().equals("private_key")) {
5062
}
5163
}
5264

65+
if (task.getDeleteInAdvance()) {
66+
deleteFiles(task);
67+
}
68+
5369
return resume(task.dump(), taskCount, control);
5470
}
5571

@@ -78,6 +94,133 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
7894
return new GcsTransactionalFileOutput(task, client, taskIndex);
7995
}
8096

97+
private void deleteFiles(PluginTask task)
98+
{
99+
logger.info("Start delete files operation");
100+
Storage client = createClient(task);
101+
try {
102+
List<StorageObject> items = listObjectsWithRetry(client, task.getBucket(), task.getPathPrefix(), task.getMaxConnectionRetry());
103+
if (items.size() == 0) {
104+
logger.info("no files were found");
105+
return;
106+
}
107+
for (StorageObject item : items) {
108+
deleteObjectWithRetry(client, item, task.getMaxConnectionRetry());
109+
logger.info("delete file: {}/{}", item.getBucket(), item.getName());
110+
}
111+
}
112+
catch (IOException ex) {
113+
throw new ConfigException(ex);
114+
}
115+
}
116+
117+
private List<StorageObject> listObjectsWithRetry(Storage client, String bucket, String prefix, int maxConnectionRetry) throws IOException
118+
{
119+
try {
120+
return retryExecutor()
121+
.withRetryLimit(maxConnectionRetry)
122+
.withInitialRetryWait(500)
123+
.withMaxRetryWait(30 * 1000)
124+
.runInterruptible(new Retryable<List<StorageObject>>() {
125+
@Override
126+
public List<StorageObject> call() throws IOException
127+
{
128+
Storage.Objects.List listObjects = client.objects().list(bucket).setDelimiter("/").setPrefix(prefix);
129+
List<StorageObject> items = new LinkedList<StorageObject>();
130+
String token = null;
131+
do {
132+
Objects objects = listObjects.execute();
133+
if (objects.getItems() == null) {
134+
break;
135+
}
136+
items.addAll(objects.getItems());
137+
token = objects.getNextPageToken();
138+
listObjects.setPageToken(token);
139+
} while (token != null);
140+
return items;
141+
}
142+
143+
@Override
144+
public boolean isRetryableException(Exception exception)
145+
{
146+
return true;
147+
}
148+
149+
@Override
150+
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
151+
{
152+
String message = String.format("GCS list request failed. Retrying %d/%d after %d seconds. Message: %s: %s",
153+
retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage());
154+
if (retryCount % 3 == 0) {
155+
logger.warn(message, exception);
156+
}
157+
else {
158+
logger.warn(message);
159+
}
160+
}
161+
162+
@Override
163+
public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
164+
{
165+
}
166+
});
167+
}
168+
catch (RetryGiveupException ex) {
169+
throw Throwables.propagate(ex.getCause());
170+
}
171+
catch (InterruptedException ex) {
172+
throw new InterruptedIOException();
173+
}
174+
}
175+
176+
private Void deleteObjectWithRetry(Storage client, StorageObject item, int maxConnectionRetry) throws IOException
177+
{
178+
try {
179+
return retryExecutor()
180+
.withRetryLimit(maxConnectionRetry)
181+
.withInitialRetryWait(500)
182+
.withMaxRetryWait(30 * 1000)
183+
.runInterruptible(new Retryable<Void>() {
184+
@Override
185+
public Void call() throws IOException
186+
{
187+
client.objects().delete(item.getBucket(), item.getName()).execute();
188+
return null;
189+
}
190+
191+
@Override
192+
public boolean isRetryableException(Exception exception)
193+
{
194+
return true;
195+
}
196+
197+
@Override
198+
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
199+
{
200+
String message = String.format("GCS delete request failed. Retrying %d/%d after %d seconds. Message: %s: %s",
201+
retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage());
202+
if (retryCount % 3 == 0) {
203+
logger.warn(message, exception);
204+
}
205+
else {
206+
logger.warn(message);
207+
}
208+
}
209+
210+
@Override
211+
public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
212+
{
213+
}
214+
});
215+
}
216+
catch (RetryGiveupException ex) {
217+
throw Throwables.propagate(ex.getCause());
218+
}
219+
catch (InterruptedException ex) {
220+
throw new InterruptedIOException();
221+
}
222+
}
223+
81224
private GcsAuthentication newGcsAuth(PluginTask task)
82225
{
83226
try {

src/main/java/org/embulk/output/PluginTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,8 @@ public interface PluginTask extends Task
5555
@Config("max_connection_retry")
5656
@ConfigDefault("10") // 10 times retry to connect GCS server if failed.
5757
int getMaxConnectionRetry();
58+
59+
@Config("delete_in_advance")
60+
@ConfigDefault("false")
61+
boolean getDeleteInAdvance();
5862
}

src/test/java/org/embulk/output/TestGcsOutputPlugin.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package org.embulk.output;
22

3+
import com.google.api.client.http.InputStreamContent;
34
import com.google.api.services.storage.Storage;
5+
import com.google.api.services.storage.Storage.Objects.Get;
6+
import com.google.api.services.storage.model.StorageObject;
47
import com.google.common.base.Optional;
58
import com.google.common.collect.ImmutableList;
69
import com.google.common.collect.ImmutableMap;
@@ -28,7 +31,9 @@
2831
import static org.junit.Assume.assumeNotNull;
2932

3033
import java.io.BufferedReader;
34+
import java.io.ByteArrayInputStream;
3135
import java.io.ByteArrayOutputStream;
36+
import java.io.File;
3237
import java.io.FileInputStream;
3338
import java.io.IOException;
3439
import java.io.InputStream;
@@ -99,6 +104,7 @@ public void checkDefaultValues()
99104

100105
PluginTask task = config.loadConfig(PluginTask.class);
101106
assertEquals("private_key", task.getAuthMethod().toString());
107+
assertEquals(false, task.getDeleteInAdvance());
102108
}
103109

104110
// p12_keyfile is null when auth_method is private_key
@@ -296,6 +302,30 @@ public void testGenerateRemotePath() throws Exception
296302
assertEquals("sample.000.01.csv", method.invoke(plugin, "......///sample", task.getSequenceFormat(), 0, 1, ".csv"));
297303
}
298304

305+
@Test
306+
public void testDeleteFilesInAdvanceSuccessfully() throws Exception
307+
{
308+
ConfigSource configSource = config();
309+
PluginTask task = configSource.loadConfig(PluginTask.class);
310+
311+
Method createClientMethod = GcsOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
312+
createClientMethod.setAccessible(true);
313+
Storage client = (Storage) createClientMethod.invoke(plugin, task);
314+
315+
uploadEmptyFile(client, task.getBucket(), task.getPathPrefix() + ".001.csv");
316+
uploadEmptyFile(client, task.getBucket(), task.getPathPrefix() + ".002.csv");
317+
318+
assertEquals(true, isFileExist(client, task.getBucket(), task.getPathPrefix() + ".001.csv"));
319+
assertEquals(true, isFileExist(client, task.getBucket(), task.getPathPrefix() + ".002.csv"));
320+
321+
Method deleteFilesMethod = GcsOutputPlugin.class.getDeclaredMethod("deleteFiles", PluginTask.class);
322+
deleteFilesMethod.setAccessible(true);
323+
deleteFilesMethod.invoke(plugin, task);
324+
325+
assertEquals(false, isFileExist(client, task.getBucket(), task.getPathPrefix() + ".001.csv"));
326+
assertEquals(false, isFileExist(client, task.getBucket(), task.getPathPrefix() + ".002.csv"));
327+
}
328+
299329
public ConfigSource config()
300330
{
301331
return Exec.newConfigSource()
@@ -416,6 +446,23 @@ private ImmutableList<List<String>> getFileContentsFromGcs(String path) throws E
416446
return builder.build();
417447
}
418448

449+
private static void uploadEmptyFile(Storage client, String gcsBucket, String gcsPath) throws IOException
450+
{
451+
InputStreamContent content = new InputStreamContent("text/plain", new ByteArrayInputStream(new byte[0]));
452+
StorageObject object = new StorageObject().setName(gcsPath);
453+
client.objects().insert(gcsBucket, object, content).execute();
454+
}
455+
456+
private static boolean isFileExist(Storage client, String gcsBucket, String gcsPath)
457+
{
458+
try {
459+
client.objects().get(gcsBucket, gcsPath).setAlt("json").execute();
460+
} catch (IOException ex) {
461+
return false;
462+
}
463+
return true;
464+
}
465+
419466
private static String getDirectory(String dir)
420467
{
421468
if (dir != null && !dir.endsWith("/")) {

0 commit comments

Comments
 (0)