ddb improvements

This commit is contained in:
Jonathan Shook
2021-12-21 13:08:34 -06:00
parent 7deb11b9ef
commit 8a397a6552
10 changed files with 275 additions and 35 deletions

View File

@@ -3,5 +3,6 @@ package io.nosqlbench.adapter.dynamodb;
public enum DynamoDBCmdType {
CreateTable,
PutItem,
GetItem;
GetItem,
Query
}

View File

@@ -5,14 +5,14 @@ import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.annotations.Stability;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "dynamodb", maturity = Stability.Experimental)
@Service(value = DriverAdapter.class, selector = "dynamodb", maturity = Maturity.Experimental)
public class DynamoDBDriverAdapter extends BaseDriverAdapter<DynamoDBOp, DynamoDBSpace> {
@Override

View File

@@ -4,6 +4,7 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBCreateTableOpDispenser;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBGetItemOpDispenser;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBPutItemOpDispenser;
import io.nosqlbench.adapter.dynamodb.opdispensers.DDBQueryOpDispenser;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
@@ -40,11 +41,14 @@ public class DynamoDBOpMapper implements OpMapper<DynamoDBOp> {
NamedTarget<DynamoDBCmdType> cmdType = cmd.getRequiredTypeFromEnum(DynamoDBCmdType.class);
switch (cmdType.enumId) {
case CreateTable:
return new DDBCreateTableOpDispenser(ddb,cmd,cmdType.targetFunction);
return new DDBCreateTableOpDispenser(ddb, cmd, cmdType.targetFunction);
case PutItem:
return new DDBPutItemOpDispenser(ddb,cmd,cmdType.targetFunction);
return new DDBPutItemOpDispenser(ddb, cmd, cmdType.targetFunction);
case GetItem:
return new DDBGetItemOpDispenser(ddb,cmd,cmdType.targetFunction);
return new DDBGetItemOpDispenser(ddb, cmd, cmdType.targetFunction);
case Query:
return new DDBQueryOpDispenser(ddb, cmd, cmdType.targetFunction);
default:
throw new OpConfigError("No implementation for " + cmdType.toString());
}

View File

@@ -4,7 +4,7 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.*;
import io.nosqlbench.adapter.dynamodb.optypes.DDBCreateTableOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.ArrayList;
@@ -88,7 +88,7 @@ import java.util.function.LongFunction;
* }
* }</pre>
*/
public class DDBCreateTableOpDispenser implements OpDispenser<DynamoDBOp> {
public class DDBCreateTableOpDispenser extends BaseOpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<String> tableNameFunc;
@@ -99,6 +99,7 @@ public class DDBCreateTableOpDispenser implements OpDispenser<DynamoDBOp> {
private final LongFunction<String> billingModeFunc;
public DDBCreateTableOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
super(cmd);
this.ddb = ddb;
this.tableNameFunc = l -> targetFunc.apply(l).toString();
this.keySchemaFunc = resolveKeySchemaFunction(cmd);

View File

@@ -6,19 +6,20 @@ import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
import io.nosqlbench.adapter.dynamodb.optypes.DDBGetItemOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class DDBGetItemOpDispenser implements OpDispenser<DynamoDBOp> {
public class DDBGetItemOpDispenser extends BaseOpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<Table> targetTableFunction;
private final LongFunction<GetItemSpec> getItemSpecFunc;
public DDBGetItemOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunction) {
super(cmd);
this.ddb = ddb;
this.targetTableFunction = l -> ddb.getTable(targetFunction.apply(l).toString());
this.getItemSpecFunc = resolveGetItemSpecFunction(cmd);

View File

@@ -4,20 +4,21 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import io.nosqlbench.adapter.dynamodb.optypes.DDBPutItemOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import java.util.Map;
import java.util.function.LongFunction;
public class DDBPutItemOpDispenser implements OpDispenser<DynamoDBOp> {
public class DDBPutItemOpDispenser extends BaseOpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<String> tableNameFunc;
private final LongFunction<? extends Item> itemfunc;
public DDBPutItemOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
super(cmd);
this.ddb = ddb;
this.tableNameFunc = l -> targetFunc.apply(l).toString();
if (cmd.isDefined("item")) {

View File

@@ -0,0 +1,181 @@
package io.nosqlbench.adapter.dynamodb.opdispensers;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import io.nosqlbench.adapter.dynamodb.optypes.DDBQueryOp;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
/**
* <pre>{@code
* {
* "AttributesToGet": [ "string" ],
* "ConditionalOperator": "string",
* "ConsistentRead": boolean,
* "ExclusiveStartKey": {
* "string" : {
* "B": blob,
* "BOOL": boolean,
* "BS": [ blob ],
* "L": [
* "AttributeValue"
* ],
* "M": {
* "string" : "AttributeValue"
* },
* "N": "string",
* "NS": [ "string" ],
* "NULL": boolean,
* "S": "string",
* "SS": [ "string" ]
* }
* },
* "ExpressionAttributeNames": {
* "string" : "string"
* },
* "ExpressionAttributeValues": {
* "string" : {
* "B": blob,
* "BOOL": boolean,
* "BS": [ blob ],
* "L": [
* "AttributeValue"
* ],
* "M": {
* "string" : "AttributeValue"
* },
* "N": "string",
* "NS": [ "string" ],
* "NULL": boolean,
* "S": "string",
* "SS": [ "string" ]
* }
* },
* "FilterExpression": "string",
* "IndexName": "string",
* "KeyConditionExpression": "string",
* "KeyConditions": {
* "string" : {
* "AttributeValueList": [
* {
* "B": blob,
* "BOOL": boolean,
* "BS": [ blob ],
* "L": [
* "AttributeValue"
* ],
* "M": {
* "string" : "AttributeValue"
* },
* "N": "string",
* "NS": [ "string" ],
* "NULL": boolean,
* "S": "string",
* "SS": [ "string" ]
* }
* ],
* "ComparisonOperator": "string"
* }
* },
* "Limit": number,
* "ProjectionExpression": "string",
* "QueryFilter": {
* "string" : {
* "AttributeValueList": [
* {
* "B": blob,
* "BOOL": boolean,
* "BS": [ blob ],
* "L": [
* "AttributeValue"
* ],
* "M": {
* "string" : "AttributeValue"
* },
* "N": "string",
* "NS": [ "string" ],
* "NULL": boolean,
* "S": "string",
* "SS": [ "string" ]
* }
* ],
* "ComparisonOperator": "string"
* }
* },
* "ReturnConsumedCapacity": "string",
* "ScanIndexForward": boolean,
* "Select": "string",
* "TableName": "string"
* }
* }</pre>
*/
public class DDBQueryOpDispenser extends BaseOpDispenser<DynamoDBOp> {
private final DynamoDB ddb;
private final LongFunction<Table> tableFunc;
private final LongFunction<QuerySpec> querySpecFunc;
public DDBQueryOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
super(cmd);
this.ddb = ddb;
LongFunction<String> tableNameFunc = l -> targetFunc.apply(l).toString();
this.tableFunc = l -> ddb.getTable(tableNameFunc.apply(l));
this.querySpecFunc = resolveQuerySpecFunc(cmd);
}
@Override
public DDBQueryOp apply(long cycle) {
Table table = tableFunc.apply(cycle);
QuerySpec queryspec = querySpecFunc.apply(cycle);
return new DDBQueryOp(ddb,table,queryspec);
}
private LongFunction<QuerySpec> resolveQuerySpecFunc(ParsedOp cmd) {
LongFunction<QuerySpec> func = l -> new QuerySpec();
Optional<LongFunction<String>> projFunc = cmd.getAsOptionalFunction("projection", String.class);
if (projFunc.isPresent()) {
LongFunction<QuerySpec> finalFunc = func;
LongFunction<String> af = projFunc.get();
func = l -> finalFunc.apply(l).withAttributesToGet(af.apply(l));
}
Optional<LongFunction<Boolean>> consistentRead = cmd.getAsOptionalFunction("ConsistentRead", boolean.class);
if (consistentRead.isPresent()) {
LongFunction<QuerySpec> finalFunc = func;
LongFunction<Boolean> consistentReadFunc = consistentRead.get();
func = l -> finalFunc.apply(l).withConsistentRead(consistentReadFunc.apply(l));
}
Optional<LongFunction<Map>> exclStrtKeyFunc = cmd.getAsOptionalFunction("ExclusiveStartKey",Map.class);
if (exclStrtKeyFunc.isPresent()) {
LongFunction<QuerySpec> finalFunc = func;
LongFunction<Map> skf = exclStrtKeyFunc.get();
LongFunction<PrimaryKey> pkf = l -> {
PrimaryKey pk = new PrimaryKey();
skf.apply(l).forEach((k,v) -> pk.addComponent(k.toString(),v.toString()));
return pk;
};
func = l -> finalFunc.apply(l).withExclusiveStartKey(pkf.apply(l));
}
Optional<LongFunction<Integer>> limitFunc = cmd.getAsOptionalFunction("Limit",Integer.class);
if (limitFunc.isPresent()) {
LongFunction<Integer> limitf = limitFunc.get();
LongFunction<QuerySpec> finalFunc = func;
func = l -> finalFunc.apply(l).withMaxResultSize(limitf.apply(l));
}
return func;
}
}

View File

@@ -12,6 +12,7 @@ import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
public class DDBGetItemOp extends DynamoDBOp {
private final Table table;
private GetItemSpec getItemSpec;
private long resultSize=-1;
public DDBGetItemOp(DynamoDB ddb, Table table, GetItemSpec getItemSpec) {
super(ddb);
@@ -22,6 +23,12 @@ public class DDBGetItemOp extends DynamoDBOp {
@Override
public Item apply(long value) {
Item result = table.getItem(getItemSpec);
resultSize=result.numberOfAttributes();
return result;
}
@Override
public long getResultSize() {
return resultSize;
}
}

View File

@@ -0,0 +1,32 @@
package io.nosqlbench.adapter.dynamodb.optypes;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.ItemCollection;
import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
public class DDBQueryOp extends DynamoDBOp {
private final Table table;
private final QuerySpec querySpec;
private long resultSize = -1;
public DDBQueryOp(DynamoDB ddb, Table table, QuerySpec querySpec) {
super(ddb);
this.table = table;
this.querySpec = querySpec;
}
@Override
public ItemCollection<QueryOutcome> apply(long value) {
ItemCollection<QueryOutcome> result = table.query(querySpec);
this.resultSize = result.getAccumulatedItemCount();
return result;
}
@Override
public long getResultSize() {
return resultSize;
}
}

View File

@@ -3,6 +3,7 @@ scenarios:
rampup: run driver=dynamodb tags=phase:rampup region=us-east-1
read: run driver=dynamodb tags=phase:read region=us-east-1
main: run driver=dynamodb tags=phase:main region=us-east-1
read01: run driver=dynamodb tags=stmt:read01 region=us-east-1
bindings:
@@ -31,6 +32,8 @@ bindings:
part_read: Uniform(0,<<partcount:100>>)->int; ToString() -> String
clust_read: Add(1); Uniform(0,<<partsize:1000000>>)->int; ToString() -> String
params:
instrument: true
blocks:
- name: schema
@@ -78,20 +81,11 @@ blocks:
ops:
read-all:
op:
PutItem: TEMPLATE(table,tabular)
json: |
{
"part": "{part_layout}",
"clust": "{clust_layout}",
"data0": "{data0}",
"data1": "{data1}",
"data2": "{data2}",
"data3": "{data3}",
"data4": "{data4}",
"data5": "{data5}",
"data6": "{data6}",
"data7": "{data7}"
}
GetItem: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
- name: main
tags:
phase: main
@@ -118,64 +112,82 @@ blocks:
}
main-read-all:
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
# no projection means "all" implicitly
Limit: "{limit}"
# no attributes means "all" implicitly
main-read-01:
tags:
stmt: read01
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data0, data1
Limit: "{limit}"
main-read-0246:
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data0, data2, data4, data6
Limit: "{limit}"
main-read-1357:
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data1, data3, data5, data7
Limit: "{limit}"
main-read-0123:
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data0, data1, data2, data3
Limit: "{limit}"
main-read-4567:
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data4, data5, data6, data7
Limit: "{limit}"
main-read-67:
op:
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data6, data7
Limit: "{limit}"
main-read-01234567:
op:
GetItem: TEMPLATE(table,tabular)
Query: TEMPLATE(table,tabular)
key:
part: "{part_read}"
clust: "{clust_read}"
ConsistentRead: true
projection: data0, data1, data2, data3, data4, data5, data6, data7
Limit: "{limit}"