add S3 URL Handler

This commit is contained in:
Jonathan Shook 2021-10-06 17:49:27 -05:00
parent 9c91ad7136
commit 1d625f411a
7 changed files with 262 additions and 0 deletions

View File

@ -64,6 +64,16 @@
<artifactId>oshi-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.12</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.4.0-b180830.0359</version>
</dependency>
<!-- perf testing -->
<dependency>

View File

@ -0,0 +1,41 @@
package io.nosqlbench.nb.addins.s3urls;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import java.util.WeakHashMap;
/**
* This client cache uses the credentials provided in a URL to create
* a fingerprint, and then creates a customized S3 client for each unique
* instance. If these clients are not used, they are allowed to be expired
* from the map and collected.
*/
public class S3ClientCache {
private final WeakHashMap<S3UrlFields.CredentialsFingerprint, AmazonS3> cache = new WeakHashMap<>();
public S3ClientCache() {
}
public AmazonS3 get(S3UrlFields fields) {
AmazonS3 s3 = cache.computeIfAbsent(fields.credentialsFingerprint(),
cfp -> createAuthorizedClient(fields));
return s3;
}
private AmazonS3 createAuthorizedClient(S3UrlFields fields) {
if (fields.accessKey!=null && fields.secretKey!=null) {
AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
AWSCredentials specialcreds = new BasicAWSCredentials(fields.accessKey, fields.secretKey);
builder = builder.withCredentials(new AWSStaticCredentialsProvider(specialcreds));
return builder.build();
} else {
return AmazonS3ClientBuilder.defaultClient();
}
}
}

View File

@ -0,0 +1,31 @@
package io.nosqlbench.nb.addins.s3urls;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3Object;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
public class S3UrlConnection extends URLConnection {
private final S3ClientCache clientCache;
protected S3UrlConnection(S3ClientCache clientCache, URL url) {
super(url);
this.clientCache = clientCache;
}
@Override
public InputStream getInputStream() throws IOException {
S3UrlFields fields = new S3UrlFields(url);
AmazonS3 s3 = clientCache.get(fields);
S3Object object = s3.getObject(fields.bucket, fields.key);
return object.getObjectContent();
}
@Override
public void connect() throws IOException {
}
}

View File

@ -0,0 +1,85 @@
package io.nosqlbench.nb.addins.s3urls;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public class S3UrlFields {
public final String bucket;
public final String key;
public final String secretKey;
public final String accessKey;
private final String endpoint;
public S3UrlFields(URL url) {
String accessKey = null;
String secretKey = null;
String userinfo = url.getUserInfo();
if (userinfo != null) {
String[] userfields = userinfo.split(":", 2);
accessKey = URLDecoder.decode(userfields[0], StandardCharsets.UTF_8);
secretKey = URLDecoder.decode(userfields[1], StandardCharsets.UTF_8);
} else {
String query = url.getQuery();
if (query != null) {
for (String qs : query.split("&")) {
String[] words = qs.split(":", 2);
if (words[0].equals("accessKey")) {
accessKey = URLDecoder.decode(words[1], StandardCharsets.UTF_8);
} else if (words[0].equals("secretKey")) {
secretKey = URLDecoder.decode(words[1], StandardCharsets.UTF_8);
}
}
}
}
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html
this.accessKey = accessKey;
this.secretKey = secretKey;
String[] bucketAndEndpoint = url.getHost().split("\\.", 2);
this.bucket = bucketAndEndpoint[0];
this.endpoint = (bucketAndEndpoint.length==2) ? bucketAndEndpoint[1] : "";
this.key = url.getPath().substring(1);
}
public CredentialsFingerprint credentialsFingerprint() {
return new CredentialsFingerprint(this);
}
public static class CredentialsFingerprint {
private final S3UrlFields fields;
public CredentialsFingerprint(S3UrlFields fields) {
this.fields = fields;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
S3UrlFields that = (S3UrlFields) o;
if (!Objects.equals(fields.secretKey, that.secretKey)) return false;
if (!Objects.equals(fields.accessKey, that.accessKey)) return false;
return Objects.equals(fields.endpoint, that.endpoint);
}
@Override
public int hashCode() {
int result = (fields.secretKey != null ? fields.secretKey.hashCode() : 0);
result = 31 * result + (fields.accessKey != null ? fields.accessKey.hashCode() : 0);
result = 31 * result + (fields.endpoint != null ? fields.endpoint.hashCode() : 0);
return result;
}
}
}

View File

@ -0,0 +1,21 @@
package io.nosqlbench.nb.addins.s3urls;
import java.io.IOException;
import java.net.URL;
import java.net.URLStreamHandler;
public class S3UrlStreamHandler extends URLStreamHandler {
private final S3ClientCache clientCache;
private final String protocol;
public S3UrlStreamHandler(S3ClientCache clientCache, String protocol) {
this.clientCache = clientCache;
this.protocol = protocol;
}
@Override
protected S3UrlConnection openConnection(URL url) throws IOException {
return new S3UrlConnection(clientCache, url);
}
}

View File

@ -0,0 +1,21 @@
package io.nosqlbench.nb.addins.s3urls;
import io.nosqlbench.nb.annotations.Service;
import java.net.URLStreamHandler;
import java.net.spi.URLStreamHandlerProvider;
@Service(value = URLStreamHandlerProvider.class, selector = "s3")
public class S3UrlStreamHandlerProvider extends URLStreamHandlerProvider {
private final S3ClientCache clientCache = new S3ClientCache();
@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if ("s3".equals(protocol)) {
return new S3UrlStreamHandler(clientCache, protocol);
}
return null;
}
}

View File

@ -0,0 +1,53 @@
package io.nosqlbench.nb.addins.s3urls;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.PutObjectResult;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
public class S3UrlStreamHandlerTest {
/**
* This test requires that you have credentials already configured on your local system
* for S3. It creates an object using the s3 client directly, then uses a generic
* URL method to access and verify the contents.
*/
@Disabled
@Test
public void sanityCheckS3UrlHandler() {
AmazonS3 client = AmazonS3ClientBuilder.defaultClient();
String bucketName = "nb-extension-test";
String keyName = "key-name";
String testValue = "test-value";
Bucket bucket = null;
if (!client.doesBucketExistV2(bucketName)) {
bucket = client.createBucket(bucketName);
}
PutObjectResult putObjectResult = client.putObject(bucketName, keyName, testValue);
assertThat(putObjectResult).isNotNull();
try {
URL url = new URL("s3://"+bucketName+"/"+keyName);
InputStream is = url.openStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line = br.readLine();
assertThat(line).isEqualTo(testValue);
System.out.println(line);
} catch (Exception e) {
e.printStackTrace();
}
}
}