initial ddb driver on new driver api

This commit is contained in:
Jonathan Shook 2021-12-20 10:03:32 -06:00
parent 66c07a0ae3
commit 0b0602c436
16 changed files with 555 additions and 3 deletions

View File

@ -0,0 +1,7 @@
package io.nosqlbench.adapter.dynamodb;
public enum DynamoDBCmdType {
CreateTable,
PutItem,
GetItem;
}

View File

@ -0,0 +1,34 @@
package io.nosqlbench.adapter.dynamodb;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.annotations.Stability;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "dynamodb", maturity = Stability.Experimental)
public class DynamoDBDriverAdapter extends BaseDriverAdapter<DynamoDBOp, DynamoDBSpace> {
@Override
public OpMapper<DynamoDBOp> getOpMapper() {
DriverSpaceCache<? extends DynamoDBSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new DynamoDBOpMapper(adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends DynamoDBSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new DynamoDBSpace(s,cfg);
}
@Override
public NBConfigModel getConfigModel() {
return DynamoDBSpace.getConfigModel();
}
}

View File

@ -0,0 +1,55 @@
package io.nosqlbench.adapter.dynamodb;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBCreateTableOpDispenser;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBGetItemOpDispenser;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBPutItemOpDispenser;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.NamedTarget;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.errors.OpConfigError;
public class DynamoDBOpMapper implements OpMapper<DynamoDBOp> {
private final NBConfiguration cfg;
private final DriverSpaceCache<? extends DynamoDBSpace> cache;
public DynamoDBOpMapper(NBConfiguration cfg, DriverSpaceCache<? extends DynamoDBSpace> cache) {
this.cfg = cfg;
this.cache = cache;
}
@Override
public OpDispenser<DynamoDBOp> apply(ParsedOp cmd) {
String space = cmd.getStaticConfigOr("space", "default");
DynamoDB ddb = cache.get(space).getDynamoDB();
/*
* If the user provides a body element, then they want to provide the JSON or
* a data structure that can be converted into JSON, bypassing any further
* specialized type-checking or op-type specific features
*/
if (cmd.isDefined("body")) {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
// return new RawDynamoDBOpDispenser(cmd);
} else {
NamedTarget<DynamoDBCmdType> cmdType = cmd.getRequiredTypeFromEnum(DynamoDBCmdType.class);
switch (cmdType.enumId) {
case CreateTable:
return new DDBCreateTableOpDispenser(ddb,cmd,cmdType.targetFunction);
case PutItem:
return new DDBPutItemOpDispenser(ddb,cmd,cmdType.targetFunction);
case GetItem:
return new DDBGetItemOpDispenser(ddb,cmd,cmdType.targetFunction);
default:
throw new OpConfigError("No implementation for " + cmdType.toString());
}
}
}
}

View File

@ -0,0 +1,56 @@
package io.nosqlbench.adapter.dynamodb;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.OpConfigError;
import java.util.Optional;
public class DynamoDBSpace {
private final String name;
DynamoDB dynamoDB;
public DynamoDBSpace(String name, NBConfiguration cfg) {
this.name = name;
AmazonDynamoDB client = createClient(cfg);
dynamoDB= new DynamoDB(client);
}
public DynamoDB getDynamoDB() {
return dynamoDB;
}
private AmazonDynamoDB createClient(NBConfiguration cfg) {
AmazonDynamoDBClientBuilder builder = AmazonDynamoDBClientBuilder.standard();
Optional<String> region = cfg.getOptional("region");
Optional<String> endpoint = cfg.getOptional("endpoint");
Optional<String> signing_region = cfg.getOptional("signing_region");
if (region.isPresent() && (endpoint.isPresent() || signing_region.isPresent())) {
throw new OpConfigError("If you specify region, endpoint and signing_region option are ambiguous");
}
if (region.isPresent()) {
builder.withRegion(region.get());
} else if (endpoint.isPresent() && signing_region.isPresent()){
AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint.get(), signing_region.get());
builder = builder.withEndpointConfiguration(endpointConfiguration);
} else {
throw new OpConfigError("Either region or endpoint and signing_region options are required.");
}
return builder.build();
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(DynamoDBSpace.class)
.add(Param.optional("endpoint"))
.add(Param.optional("signing_region"))
.add(Param.optional("region"))
.asReadOnly();
}
}

View File

@ -0,0 +1,152 @@
package io.nosqlbench.adapter.dynamodb.opdispensers;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.*;
import io.nosqlbench.adapter.dynamodb.optypes.DDBCreateTableOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.LongFunction;
/**
* <pre>{@code
* Request Syntax
* {
* "AttributeDefinitions": [
* {
* "AttributeName": "string",
* "AttributeType": "string"
* }
* ],
* "BillingMode": "string",
* "GlobalSecondaryIndexes": [
* {
* "IndexName": "string",
* "KeySchema": [
* {
* "AttributeName": "string",
* "KeyType": "string"
* }
* ],
* "Projection": {
* "NonKeyAttributes": [ "string" ],
* "ProjectionType": "string"
* },
* "ProvisionedThroughput": {
* "ReadCapacityUnits": number,
* "WriteCapacityUnits": number
* }
* }
* ],
* "KeySchema": [
* {
* "AttributeName": "string",
* "KeyType": "string"
* }
* ],
* "LocalSecondaryIndexes": [
* {
* "IndexName": "string",
* "KeySchema": [
* {
* "AttributeName": "string",
* "KeyType": "string"
* }
* ],
* "Projection": {
* "NonKeyAttributes": [ "string" ],
* "ProjectionType": "string"
* }
* }
* ],
* "ProvisionedThroughput": {
* "ReadCapacityUnits": number,
* "WriteCapacityUnits": number
* },
* "SSESpecification": {
* "Enabled": boolean,
* "KMSMasterKeyId": "string",
* "SSEType": "string"
* },
* "StreamSpecification": {
* "StreamEnabled": boolean,
* "StreamViewType": "string"
* },
* "TableClass": "string",
* "TableName": "string",
* "Tags": [
* {
* "Key": "string",
* "Value": "string"
* }
* ]
* }
* }</pre>
*/
public class DDBCreateTableOpDispenser implements OpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<String> tableNameFunc;
private final LongFunction<Collection<KeySchemaElement>> keySchemaFunc;
private final LongFunction<Collection<AttributeDefinition>> attributeDefsFunc;
private final LongFunction<String> readCapacityFunc;
private final LongFunction<String> writeCapacityFunc;
private final LongFunction<String> billingModeFunc;
public DDBCreateTableOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
this.ddb = ddb;
this.tableNameFunc = l -> targetFunc.apply(l).toString();
this.keySchemaFunc = resolveKeySchemaFunction(cmd);
this.attributeDefsFunc = resolveAttributeDefinitionFunction(cmd);
this.billingModeFunc = cmd.getAsFunctionOr("BillingMode", BillingMode.PROVISIONED.name());
this.readCapacityFunc = cmd.getAsFunctionOr("ReadCapacityUnits", "10");
this.writeCapacityFunc = cmd.getAsFunctionOr("WriteCapacityUnits", "10");
}
@Override
public DDBCreateTableOp apply(long cycle) {
CreateTableRequest rq = new CreateTableRequest();
rq.setTableName(tableNameFunc.apply(cycle));
rq.setKeySchema(keySchemaFunc.apply(cycle));
rq.setAttributeDefinitions(attributeDefsFunc.apply(cycle));
rq.setBillingMode(BillingMode.valueOf(billingModeFunc.apply(cycle)).name());
if (rq.getBillingMode().equals(BillingMode.PROVISIONED.name())) {
rq.setProvisionedThroughput(
new ProvisionedThroughput(
Long.parseLong(readCapacityFunc.apply(cycle)),
Long.parseLong(writeCapacityFunc.apply(cycle)))
);
}
return new DDBCreateTableOp(ddb, rq);
}
private LongFunction<Collection<AttributeDefinition>> resolveAttributeDefinitionFunction(ParsedOp cmd) {
LongFunction<? extends Map> attrsmap = cmd.getAsRequiredFunction("Attributes", Map.class);
return (long l) -> {
List<AttributeDefinition> defs = new ArrayList<>();
attrsmap.apply(l).forEach((k, v) -> {
defs.add(new AttributeDefinition(k.toString(), ScalarAttributeType.valueOf(v.toString())));
});
return defs;
};
}
private LongFunction<Collection<KeySchemaElement>> resolveKeySchemaFunction(ParsedOp cmd) {
LongFunction<? extends Map> keysmap = cmd.getAsRequiredFunction("Keys", Map.class);
return (long l) -> {
List<KeySchemaElement> elems = new ArrayList<>();
keysmap.apply(l).forEach((k, v) -> {
elems.add(new KeySchemaElement(k.toString(), KeyType.valueOf(v.toString())));
});
return elems;
};
}
}

View File

@ -0,0 +1,63 @@
package io.nosqlbench.adapter.dynamodb.opdispensers;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
import io.nosqlbench.adapter.dynamodb.optypes.DDBGetItemOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class DDBGetItemOpDispenser implements OpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<Table> targetTableFunction;
private final LongFunction<GetItemSpec> getItemSpecFunc;
public DDBGetItemOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunction) {
this.ddb = ddb;
this.targetTableFunction = l -> ddb.getTable(targetFunction.apply(l).toString());
this.getItemSpecFunc = resolveGetItemSpecFunction(cmd);
}
private LongFunction<GetItemSpec> resolveGetItemSpecFunction(ParsedOp cmd) {
PrimaryKey primaryKey = null;
LongFunction<PrimaryKey> pkfunc = null;
String projection = null;
LongFunction<String> projfunc = null;
LongFunction<? extends Map> keysmap_func = cmd.getAsRequiredFunction("key",Map.class);
LongFunction<PrimaryKey> pk_func = l -> {
PrimaryKey pk = new PrimaryKey();
keysmap_func.apply(l).forEach((k,v) -> {
pk.addComponent(k.toString(),v);
});
return pk;
};
Optional<LongFunction<String>> projection_func = cmd.getAsOptionalFunction("projection",String.class);
LongFunction<GetItemSpec> gis = l -> new GetItemSpec().withPrimaryKey(pk_func.apply(l));
if (projection_func.isPresent()) {
LongFunction<GetItemSpec> finalGis = gis;
gis = l -> {
LongFunction<String> pj = projection_func.get();
return finalGis.apply(l).withProjectionExpression(pj.apply(1));
};
}
return gis;
}
@Override
public DDBGetItemOp apply(long value) {
Table table = targetTableFunction.apply(value);
GetItemSpec getitemSpec = getItemSpecFunc.apply(value);
return new DDBGetItemOp(ddb, table, getitemSpec);
}
}

View File

@ -0,0 +1,40 @@
package io.nosqlbench.adapter.dynamodb.opdispensers;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import io.nosqlbench.adapter.dynamodb.optypes.DDBPutItemOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import java.util.Map;
import java.util.function.LongFunction;
public class DDBPutItemOpDispenser implements OpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<String> tableNameFunc;
private final LongFunction<? extends Item> itemfunc;
public DDBPutItemOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
this.ddb = ddb;
this.tableNameFunc = l -> targetFunc.apply(l).toString();
if (cmd.isDefined("item")) {
LongFunction<? extends Map> f1 = cmd.getAsRequiredFunction("item", Map.class);
this.itemfunc = l -> Item.fromMap(f1.apply(l));
} else if (cmd.isDefined("json")) {
LongFunction<? extends String> f1 = cmd.getAsRequiredFunction("json", String.class);
this.itemfunc = l -> Item.fromJSON(f1.apply(l));
} else {
throw new OpConfigError("PutItem op templates require either an 'item' map field or a 'json' text field");
}
}
@Override
public DynamoDBOp apply(long value) {
String tablename = tableNameFunc.apply(value);
Item item = itemfunc.apply(value);
return new DDBPutItemOp(ddb,tablename,item);
}
}

View File

@ -0,0 +1,34 @@
package io.nosqlbench.adapter.dynamodb.opdispensers;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.adapter.dynamodb.optypes.RawDynamodOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class RawDynamoDBOpDispenser implements OpDispenser<DynamoDBOp> {
private final LongFunction<? extends String> jsonFunction;
private final DynamoDB ddb;
public RawDynamoDBOpDispenser(DynamoDB ddb, ParsedOp cmd) {
this.ddb = ddb;
String bodytype = cmd.getValueType("body").getSimpleName();
switch (bodytype) {
case "String":
jsonFunction=cmd.getAsRequiredFunction("body");
break;
default:
throw new RuntimeException("Unable to create body mapping function from type '" + bodytype + "'");
}
}
@Override
public DynamoDBOp apply(long value) {
String body = jsonFunction.apply(value);
return new RawDynamodOp(ddb,body);
}
}

View File

@ -0,0 +1,20 @@
package io.nosqlbench.adapter.dynamodb.optypes;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
public class DDBCreateTableOp extends DynamoDBOp {
private final CreateTableRequest rq;
public DDBCreateTableOp(DynamoDB ddb, CreateTableRequest rq) {
super(ddb);
this.rq = rq;
}
@Override
public Table apply(long value) {
return ddb.createTable(rq);
}
}

View File

@ -0,0 +1,27 @@
package io.nosqlbench.adapter.dynamodb.optypes;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
/**
* @see <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html#API_GetItem_RequestSyntax">GetItem API</a>
* @see <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.Attributes.html">Expressions.Attributes</a>
*/
public class DDBGetItemOp extends DynamoDBOp {
private final Table table;
private GetItemSpec getItemSpec;
public DDBGetItemOp(DynamoDB ddb, Table table, GetItemSpec getItemSpec) {
super(ddb);
this.table = table;
this.getItemSpec = getItemSpec;
}
@Override
public Item apply(long value) {
Item result = table.getItem(getItemSpec);
return result;
}
}

View File

@ -0,0 +1,22 @@
package io.nosqlbench.adapter.dynamodb.optypes;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
public class DDBPutItemOp extends DynamoDBOp {
private final String tablename;
private final Item item;
public DDBPutItemOp(DynamoDB ddb, String tablename, Item item) {
super(ddb);
this.tablename = tablename;
this.item = item;
}
@Override
public PutItemOutcome apply(long value) {
PutItemOutcome outcome = ddb.getTable(tablename).putItem(item);
return outcome;
}
}

View File

@ -0,0 +1,17 @@
package io.nosqlbench.adapter.dynamodb.optypes;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
/**
* https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html?icmpid=docs_dynamodb_help_panel_hp_capacity#HowItWorks.ProvisionedThroughput.Manual
*/
public abstract class DynamoDBOp implements CycleOp<Object> {
protected DynamoDB ddb;
public DynamoDBOp(DynamoDB ddb) {
this.ddb = ddb;
}
}

View File

@ -0,0 +1,15 @@
package io.nosqlbench.adapter.dynamodb.optypes;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
public class RawDynamodOp extends DynamoDBOp {
public RawDynamodOp(DynamoDB ddb, String body) {
super(ddb);
}
@Override
public Table apply(long value) {
throw new RuntimeException("raw ops are not supported in this API yet");
}
}

View File

@ -6,7 +6,7 @@
<parent> <parent>
<artifactId>mvn-defaults</artifactId> <artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId> <groupId>io.nosqlbench</groupId>
<version>4.15.65-SNAPSHOT</version> <version>4.15.71-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath> <relativePath>../mvn-defaults</relativePath>
</parent> </parent>
@ -23,7 +23,7 @@
<dependency> <dependency>
<groupId>io.nosqlbench</groupId> <groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId> <artifactId>drivers-api</artifactId>
<version>4.15.65-SNAPSHOT</version> <version>4.15.71-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -64,8 +64,9 @@
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId> <artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.12</version> <version>1.12.129</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.xml.bind</groupId> <groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId> <artifactId>jaxb-api</artifactId>

View File

@ -83,6 +83,15 @@
<module>adapter-cqld4</module> <module>adapter-cqld4</module>
</modules> </modules>
</profile> </profile>
<profile>
<id>with-dynamodb</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
<module>adapter-dynamodb</module>
</modules>
</profile>
<profile> <profile>
<id>with-mongodb</id> <id>with-mongodb</id>
<activation> <activation>