diff --git a/engine-extensions/src/main/java/io/nosqlbench/engine/extensions/s3uploader/S3Uploader.java b/engine-extensions/src/main/java/io/nosqlbench/engine/extensions/s3uploader/S3Uploader.java new file mode 100644 index 000000000..a96911ea3 --- /dev/null +++ b/engine-extensions/src/main/java/io/nosqlbench/engine/extensions/s3uploader/S3Uploader.java @@ -0,0 +1,82 @@ +package io.nosqlbench.engine.extensions.s3uploader; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.transfer.MultipleFileUpload; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerBuilder; +import com.codahale.metrics.MetricRegistry; +import io.nosqlbench.nb.addins.s3.s3urlhandler.S3ClientCache; +import io.nosqlbench.nb.addins.s3.s3urlhandler.S3UrlFields; +import io.nosqlbench.nb.api.NBEnvironment; +import io.nosqlbench.nb.api.metadata.ScenarioMetadata; +import io.nosqlbench.nb.api.metadata.ScenarioMetadataAware; +import org.apache.logging.log4j.Logger; + +import javax.script.ScriptContext; +import java.io.File; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.util.Map; + +public class S3Uploader implements ScenarioMetadataAware { + private final Logger logger; + private final MetricRegistry metricRegistry; + private final ScriptContext scriptContext; + private ScenarioMetadata scenarioMetadata; + + public S3Uploader(Logger logger, MetricRegistry metricRegistry, ScriptContext scriptContext) { + this.logger = logger; + this.metricRegistry = metricRegistry; + this.scriptContext = scriptContext; + } + + public String uploadDirToUrl(String localFilePath, String urlTemplate) { + return uploadDirToUrlTokenized(localFilePath, urlTemplate, Map.of()); + } + + /** + * Upload the local file path to the specified URL, and then return the URL of the + * bucket in its fully expanded form. + * This requires you to specify a valid S3 url with place-holder tokens, such as + *
{@code s3://bucketname/prefix1/prefix2/DATA}
+ * Before processing, some tokens will be automatically expanded based on local node + * info. These tokens include: + * + * @return The bucket URL of the expaneded name. + */ + public String uploadDirToUrlTokenized(String localFilePath, String urlTemplate, Map params) { + + + Path sourcePath = Path.of(localFilePath); + if (!FileSystems.getDefault().equals(sourcePath.getFileSystem())) { + throw new RuntimeException("The file must reside on the default filesystem to be uploaded by S3."); + } + if (!Files.isDirectory(sourcePath, LinkOption.NOFOLLOW_LINKS)) { + throw new RuntimeException("path '" + sourcePath + "' is not a directory."); + } + File sourceDir = sourcePath.toFile(); + + String url = NBEnvironment.INSTANCE + .interpolateWithTimestamp(urlTemplate,scenarioMetadata.getStartedAt(),params) + .orElseThrow(); + S3UrlFields fields = S3UrlFields.fromURLString(url); + S3ClientCache s3ClientCache = new S3ClientCache(); + AmazonS3 s3 = s3ClientCache.get(fields); + TransferManager xfers = TransferManagerBuilder.standard().withS3Client(s3).build(); + String prefix = fields.key; + MultipleFileUpload mfu = xfers.uploadDirectory(fields.bucket, prefix, sourceDir, true); + try { + mfu.waitForCompletion(); + } catch (InterruptedException e) { + throw new RuntimeException("Multi-file upload was interrupted."); + } + return url; + } + + @Override + public void setScenarioMetadata(ScenarioMetadata metadata) { + this.scenarioMetadata = metadata; + } +} diff --git a/engine-extensions/src/main/java/io/nosqlbench/engine/extensions/s3uploader/S3UploaderPluginData.java b/engine-extensions/src/main/java/io/nosqlbench/engine/extensions/s3uploader/S3UploaderPluginData.java new file mode 100644 index 000000000..9f59f1acd --- /dev/null +++ b/engine-extensions/src/main/java/io/nosqlbench/engine/extensions/s3uploader/S3UploaderPluginData.java @@ -0,0 +1,32 @@ +package io.nosqlbench.engine.extensions.s3uploader; + +import com.codahale.metrics.MetricRegistry; +import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo; +import io.nosqlbench.nb.annotations.Service; +import io.nosqlbench.nb.api.metadata.ScenarioMetadata; +import io.nosqlbench.nb.api.metadata.ScenarioMetadataAware; +import org.apache.logging.log4j.Logger; + +import javax.script.ScriptContext; + +@Service(value = ScriptingPluginInfo.class, selector = "s3") +public class S3UploaderPluginData implements ScriptingPluginInfo, ScenarioMetadataAware { + private ScenarioMetadata scenarioMetadata; + + @Override + public String getDescription() { + return "Allow for uploading or downloading a directory from S3"; + } + + @Override + public S3Uploader getExtensionObject(Logger logger, MetricRegistry metricRegistry, ScriptContext scriptContext) { + S3Uploader uploader = new S3Uploader(logger, metricRegistry, scriptContext); + ScenarioMetadataAware.apply(uploader,scenarioMetadata); + return uploader; + } + + @Override + public void setScenarioMetadata(ScenarioMetadata metadata) { + this.scenarioMetadata = metadata; + } +}