From fb782af61fbe8b23c353dc457d9ea78120fbebac Mon Sep 17 00:00:00 2001 From: chikamura Date: Tue, 29 Oct 2024 15:35:52 +0900 Subject: [PATCH] delete files in advance --- README.md | 1 + .../embulk/output/gcs/GcsOutputPlugin.java | 138 ++++++++++++++++++ .../org/embulk/output/gcs/PluginTask.java | 4 + .../output/gcs/TestGcsOutputPlugin.java | 35 +++++ 4 files changed, 178 insertions(+) diff --git a/README.md b/README.md index d240439..ef53c2f 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Google Cloud Storage output plugin for [Embulk](https://github.com/embulk/embulk - **json_keyfile** fullpath of json_key (string, required when auth_method is json_key) - **application_name**: Application name, anything you like (string, optional, default value is "embulk-output-gcs") - **max_connection_retry**: Number of connection retries to GCS (number, default value is 10) +- **delete_in_advance**: Delete Bucket/Prefix matched files in advance (boolean, default value is false) ## Example diff --git a/src/main/java/org/embulk/output/gcs/GcsOutputPlugin.java b/src/main/java/org/embulk/output/gcs/GcsOutputPlugin.java index 8ae1904..f8cf11a 100644 --- a/src/main/java/org/embulk/output/gcs/GcsOutputPlugin.java +++ b/src/main/java/org/embulk/output/gcs/GcsOutputPlugin.java @@ -16,8 +16,12 @@ package org.embulk.output.gcs; +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; @@ -29,14 +33,23 @@ import org.embulk.util.config.ConfigMapperFactory; import org.embulk.util.config.TaskMapper; import org.embulk.util.config.units.LocalFile; +import org.embulk.util.retryhelper.RetryExecutor; +import org.embulk.util.retryhelper.RetryGiveupException; +import org.embulk.util.retryhelper.Retryable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.invoke.MethodHandles; import java.security.GeneralSecurityException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; public class GcsOutputPlugin implements FileOutputPlugin { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder() .addDefaultModules().build(); public static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY.createConfigMapper(); @@ -71,6 +84,10 @@ else if (task.getAuthMethod().getString().equals("private_key")) { } } + if (task.getDeleteInAdvance()) { + deleteFiles(task); + } + return resume(task.toTaskSource(), taskCount, control); } @@ -120,4 +137,125 @@ public Storage createClient(final PluginTask task) throw new RuntimeException(ex); } } + + public void deleteFiles(PluginTask task) + { + logger.info("Start delete files operation"); + Storage client = createClient(task); + try { + List blobIds = listObjectsWithRetry(client, task.getBucket(), task.getPathPrefix(), task.getMaxConnectionRetry()); + if (blobIds.isEmpty()) { + logger.info("no files were found"); + return; + } + for (BlobId blobId : blobIds) { + deleteObjectWithRetry(client, blobId, task.getMaxConnectionRetry()); + logger.info("delete file: {}/{}", blobId.getBucket(), blobId.getName()); + } + } + catch (IOException ex) { + throw new ConfigException(ex); + } + } + + private List listObjectsWithRetry(Storage client, String bucket, String prefix, int maxConnectionRetry) throws IOException + { + try { + return RetryExecutor.builder() + .withRetryLimit(maxConnectionRetry) + .withInitialRetryWaitMillis(500) + .withMaxRetryWaitMillis(30 * 1000) + .build() + .runInterruptible(new Retryable>() { + @Override + public List call() throws IOException + { + // https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix#storage_list_files_with_prefix-java + Page list = client.list(bucket, Storage.BlobListOption.prefix(prefix), Storage.BlobListOption.currentDirectory()); + List blobIds = new ArrayList<>(); + list.iterateAll().forEach(x -> blobIds.add(x.getBlobId())); + return blobIds; + } + + @Override + public boolean isRetryableException(Exception exception) + { + return true; + } + + @Override + public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException + { + String message = String.format("GCS list request failed. Retrying %d/%d after %d seconds. Message: %s: %s", + retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage()); + if (retryCount % 3 == 0) { + logger.warn(message, exception); + } + else { + logger.warn(message); + } + } + + @Override + public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException + { + } + }); + } + catch (RetryGiveupException ex) { + throw Throwables.propagate(ex.getCause()); + } + catch (InterruptedException ex) { + throw new InterruptedIOException(); + } + } + + private Void deleteObjectWithRetry(Storage client, BlobId blobId, int maxConnectionRetry) throws IOException + { + try { + return RetryExecutor.builder() + .withRetryLimit(maxConnectionRetry) + .withInitialRetryWaitMillis(500) + .withMaxRetryWaitMillis(30 * 1000) + .build() + .runInterruptible(new Retryable() { + @Override + public Void call() throws IOException + { + client.delete(blobId); + return null; + } + + @Override + public boolean isRetryableException(Exception exception) + { + return true; + } + + @Override + public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException + { + String message = String.format("GCS delete request failed. Retrying %d/%d after %d seconds. Message: %s: %s", + retryCount, retryLimit, retryWait / 1000, exception.getClass(), exception.getMessage()); + if (retryCount % 3 == 0) { + logger.warn(message, exception); + } + else { + logger.warn(message); + } + } + + @Override + public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException + { + } + }); + } + catch (RetryGiveupException ex) { + throw Throwables.propagate(ex.getCause()); + } + catch (InterruptedException ex) { + throw new InterruptedIOException(); + } + } } diff --git a/src/main/java/org/embulk/output/gcs/PluginTask.java b/src/main/java/org/embulk/output/gcs/PluginTask.java index 6a95dc8..75810f3 100644 --- a/src/main/java/org/embulk/output/gcs/PluginTask.java +++ b/src/main/java/org/embulk/output/gcs/PluginTask.java @@ -87,4 +87,8 @@ public interface PluginTask extends Task @Config("key_pass") @ConfigDefault("\"notasecret\"") String getKeyPass(); + + @Config("delete_in_advance") + @ConfigDefault("false") + boolean getDeleteInAdvance(); } diff --git a/src/test/java/org/embulk/output/gcs/TestGcsOutputPlugin.java b/src/test/java/org/embulk/output/gcs/TestGcsOutputPlugin.java index 0d2aace..c0ed8ce 100644 --- a/src/test/java/org/embulk/output/gcs/TestGcsOutputPlugin.java +++ b/src/test/java/org/embulk/output/gcs/TestGcsOutputPlugin.java @@ -18,6 +18,7 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -48,6 +49,7 @@ import static org.embulk.output.gcs.GcsOutputPlugin.CONFIG_MAPPER; import static org.embulk.output.gcs.GcsOutputPlugin.CONFIG_MAPPER_FACTORY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeNotNull; @@ -138,6 +140,7 @@ public void checkDefaultValues() PluginTask task = CONFIG_MAPPER.map(config, PluginTask.class); assertEquals("private_key", task.getAuthMethod().toString()); + assertFalse(task.getDeleteInAdvance()); } // p12_keyfile is null when auth_method is private_key @@ -302,6 +305,26 @@ public void testGenerateRemotePath() throws Exception assertEquals("sample.000.01.csv", fileOutput.generateRemotePath("......///sample", task.getSequenceFormat(), 0, 1, ".csv")); } + @Test + public void testDeleteFilesInAdvanceSuccessfully() throws Exception + { + ConfigSource configSource = config(); + PluginTask task = CONFIG_MAPPER.map(configSource, PluginTask.class); + Storage client = plugin.createClient(task); + + // Even if a file exists, it will be a success. + uploadEmptyFile(client, task.getBucket(), task.getPathPrefix() + ".001.csv"); + uploadEmptyFile(client, task.getBucket(), task.getPathPrefix() + ".002.csv"); + + assertTrue(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".001.csv")); + assertTrue(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".002.csv")); + + plugin.deleteFiles(task); + + assertFalse(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".001.csv")); + assertFalse(isFileExist(client, task.getBucket(), task.getPathPrefix() + ".002.csv")); + } + public ConfigSource config() { byte[] keyBytes = Base64.getDecoder().decode(GCP_P12_KEYFILE.get()); @@ -409,4 +432,16 @@ private byte[] convertInputStreamToByte(InputStream is) throws IOException } return bo.toByteArray(); } + + private static void uploadEmptyFile(Storage client, String gcsBucket, String gcsPath) throws IOException + { + BlobId blobId = BlobId.of(gcsBucket, gcsPath); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + client.create(blobInfo, new byte[0]); + } + + private static boolean isFileExist(Storage client, String gcsBucket, String gcsPath) + { + return client.get(BlobId.of(gcsBucket, gcsPath)) != null; + } }