Add MongoDB support

The Basics
- insert
- find
This commit is contained in:
Justin Chu 2020-05-18 12:33:08 -04:00
parent 3490578b20
commit 9cf6cd9953
11 changed files with 537 additions and 0 deletions

49
driver-mongodb/pom.xml Normal file
View File

@ -0,0 +1,49 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>driver-mongodb</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.117-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<description>
An nosqlbench ActivityType (AT) driver module;
MongoDB
</description>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.117-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.4</version>
</dependency>
<!-- test scope only -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,64 @@
package io.nosqlbench.driver.mongodb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoDatabase;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import org.bson.Document;
import org.bson.conversions.Bson;
public class MongoAction implements SyncAction {
private final static Logger logger = LoggerFactory.getLogger(MongoAction.class);
private final MongoActivity activity;
private final int slot;
private OpSequence<ReadyMongoStatement> sequencer;
public MongoAction(MongoActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
}
@Override
public void init() {
this.sequencer = activity.getOpSequencer();
}
@Override
public int runCycle(long cycleValue) {
ReadyMongoStatement rms;
Bson queryBson;
try (Timer.Context bindTime = activity.bindTimer.time()) {
rms = sequencer.get(cycleValue);
queryBson = rms.bind(cycleValue);
if (activity.isShowQuery()) {
logger.info("Query(cycle={}):\n{}", cycleValue, queryBson);
}
}
for (int i = 0; i < activity.getMaxTries(); i++) {
activity.triesHisto.update(i);
try (Timer.Context executeTime = activity.executeTimer.time()) {
MongoDatabase database = activity.getDatabase();
ReadPreference readPreference = rms.getReadPreference();
Document resultDoc = database.runCommand(queryBson, readPreference);
double ok = resultDoc.getDouble("ok");
activity.resultSetSizeHisto.update(resultDoc.getInteger("n", 0));
return ok == 1.0d ? 0 : 1;
} catch (Exception e) {
logger.error("Failed to runCommand {} on cycle {}, tries {}", queryBson, cycleValue, i, e);
}
}
throw new RuntimeException(String.format("Exhausted max tries (%s) on cycle %s",
cycleValue, activity.getMaxTries()));
}
}

View File

@ -0,0 +1,137 @@
package io.nosqlbench.driver.mongodb;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.engine.api.util.TagFilter;
public class MongoActivity extends SimpleActivity implements ActivityDefObserver {
private final static Logger logger = LoggerFactory.getLogger(MongoActivity.class);
private String yamlLoc;
private String connectionString;
private String databaseName;
private MongoClient client;
private MongoDatabase mongoDatabase;
private boolean showQuery;
private int maxTries;
private OpSequence<ReadyMongoStatement> opSequence;
protected Timer bindTimer;
protected Timer executeTimer;
protected Histogram resultSetSizeHisto;
protected Histogram triesHisto;
public MongoActivity(ActivityDef activityDef) {
super(activityDef);
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
// sanity check
yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload")
.orElseThrow(() -> new IllegalArgumentException("yaml is not defined"));
connectionString = activityDef.getParams().getOptionalString("connection")
.orElseThrow(() -> new IllegalArgumentException("connection is not defined"));
// TODO: support multiple databases
databaseName = activityDef.getParams().getOptionalString("database")
.orElseThrow(() -> new IllegalArgumentException("database is not defined"));
}
@Override
public void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
onActivityDefUpdate(activityDef);
opSequence = initOpSequencer();
setDefaultsFromOpSequence(opSequence);
client = MongoClients.create(connectionString);
mongoDatabase = client.getDatabase(databaseName);
showQuery = activityDef.getParams().getOptionalBoolean("showquery")
.orElse(false);
maxTries = activityDef.getParams().getOptionalInteger("maxtries")
.orElse(10);
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size");
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
}
@Override
public void shutdownActivity() {
logger.debug("shutting down activity: " + this.activityDef.getAlias());
if (client != null) {
client.close();
}
}
OpSequence<ReadyMongoStatement> initOpSequencer() {
SequencerType sequencerType = SequencerType.valueOf(
activityDef.getParams().getOptionalString("seq").orElse("bucket")
);
SequencePlanner<ReadyMongoStatement> sequencer = new SequencePlanner<>(sequencerType);
StmtsDocList stmtsDocList = StatementsLoader.load(logger, yamlLoc, new StrInterpolator(activityDef), "activities");
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
TagFilter tagFilter = new TagFilter(tagfilter);
stmtsDocList.getStmts().stream().map(tagFilter::matchesTaggedResult).forEach(r -> logger.info(r.getLog()));
List<StmtDef> stmts = stmtsDocList.getStmts(tagfilter);
for (StmtDef stmt : stmts) {
ParsedStmt parsed = stmt.getParsed().orError();
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
sequencer.addOp(new ReadyMongoStatement(stmt),
Long.parseLong(stmt.getParams().getOrDefault("ratio","1")));
}
return sequencer.resolve();
}
protected MongoDatabase getDatabase() {
return mongoDatabase;
}
protected OpSequence<ReadyMongoStatement> getOpSequencer() {
return opSequence;
}
protected boolean isShowQuery() {
return showQuery;
}
protected int getMaxTries() {
return maxTries;
}
}

View File

@ -0,0 +1,41 @@
package io.nosqlbench.driver.mongodb;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(ActivityType.class)
public class MongoActivityType implements ActivityType<MongoActivity> {
@Override
public String getName() {
return "mongodb";
}
@Override
public MongoActivity getActivity(ActivityDef activityDef) {
return new MongoActivity(activityDef);
}
@Override
public ActionDispenser getActionDispenser(MongoActivity activity) {
return new MongoActionDispenser(activity);
}
private static class MongoActionDispenser implements ActionDispenser {
private final MongoActivity activity;
public MongoActionDispenser(MongoActivity activity)
{
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new MongoAction(activity, slot);
}
}
}

View File

@ -0,0 +1,34 @@
package io.nosqlbench.driver.mongodb;
import com.mongodb.ReadPreference;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
import org.bson.Document;
import org.bson.conversions.Bson;
public class ReadyMongoStatement {
private StringBindings bindings;
private ReadPreference readPreference;
public ReadyMongoStatement(StmtDef stmtDef) {
ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt(), stmtDef.getBindings());
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt(), paramBindings);
this.bindings = template.resolve();
this.readPreference = ReadPreference.valueOf(stmtDef.getParams()
.getOrDefault("readPreference","primary"));
}
public ReadPreference getReadPreference() {
return readPreference;
}
public Bson bind(long value) {
return Document.parse(bindings.bind(value));
}
}

View File

@ -0,0 +1,57 @@
# nb -v run driver=mongodb yaml=mongodb-basic connection=mongodb://127.0.0.1 database=testdb tags=phase:rampup
scenarios:
default:
- run driver=mongodb tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=mongodb tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String
seq_value: Hash(); Mod(<<valuecount:1000000000>>); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
blocks:
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: |
{
insert: "<<collection:keyvalue>>",
documents: [ { key: {seq_key},
value: {seq_value} } ]
}
params:
readPreference: primary
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:5>>
statements:
- main-find: |
{
find: "<<collection:keyvalue>>",
filter: { key: {rw_key} }
}
params:
readPreference: primary
tags:
name: main-find
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:5>>
statements:
- main-insert: |
{
insert: "<<collection:keyvalue>>",
documents: [ { key: {rw_key},
value: {rw_value} } ]
}
tags:
name: main-insert

View File

@ -0,0 +1,3 @@
# MongoDB Driver
This is a driver for MongoDB.

View File

@ -0,0 +1,26 @@
package io.nosqlbench.driver.mongodb;
import org.junit.Test;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import static org.assertj.core.api.Assertions.assertThat;
public class MongoActivityTest {
@Test
public void testInitOpSequencer() {
String[] params = {
"yaml=activities/mongodb-basic.yaml",
"connection=mongodb://127.0.0.1",
"database=nosqlbench_testdb"
};
ActivityDef activityDef = ActivityDef.parseActivityDef(String.join(";", params));
MongoActivity mongoActivity = new MongoActivity(activityDef);
mongoActivity.initActivity();
OpSequence<ReadyMongoStatement> sequence = mongoActivity.initOpSequencer();
assertThat(sequence.getOps()).hasSize(3);
}
}

View File

@ -0,0 +1,107 @@
package io.nosqlbench.driver.mongodb;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.virtdata.core.templates.BindPoint;
import org.bson.conversions.Bson;
import static org.assertj.core.api.Assertions.assertThat;
public class ReadyMongoStatementTest {
private final static Logger logger = LoggerFactory.getLogger(ReadyMongoStatementTest.class);
private ActivityDef activityDef;
private StmtsDocList stmtsDocList;
@Before
public void setup() {
String[] params = {
"yaml=activities/mongodb-basic.yaml",
"database=nosqlbench_testdb",
};
activityDef = ActivityDef.parseActivityDef(String.join(";", params));
String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default");
stmtsDocList = StatementsLoader.load(logger, yaml_loc, new StrInterpolator(activityDef), "activities");
}
@Test
public void testResolvePhaseRampup() {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:rampup");
List<StmtDef> stmts = stmtsDocList.getStmts(tagfilter);
assertThat(stmts).hasSize(1);
for (StmtDef stmt : stmts) {
ParsedStmt parsed = stmt.getParsed().orError();
assertThat(parsed.getBindPoints()).hasSize(2);
BindPoint seqKey = new BindPoint("seq_key", "Mod(1000000000); ToString() -> String");
BindPoint seqValue = new BindPoint("seq_value", "Hash(); Mod(1000000000); ToString() -> String");
assertThat(parsed.getBindPoints()).containsExactly(seqKey, seqValue);
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt);
Bson bsonDoc = readyMongoStatement.bind(1L);
assertThat(bsonDoc).isNotNull();
}
}
@Test
public void testResolvePhaseMainRead() {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:main,name:main-find");
List<StmtDef> stmts = stmtsDocList.getStmts(tagfilter);
assertThat(stmts).hasSize(1);
for (StmtDef stmt : stmts) {
ParsedStmt parsed = stmt.getParsed().orError();
assertThat(parsed.getBindPoints()).hasSize(1);
BindPoint rwKey = new BindPoint("rw_key", "Uniform(0,1000000000)->int; ToString() -> String");
assertThat(parsed.getBindPoints()).containsExactly(rwKey);
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt);
Bson bsonDoc = readyMongoStatement.bind(1L);
assertThat(bsonDoc).isNotNull();
}
}
@Test
public void testResolvePhaseMainWrite() {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:main,name:main-insert");
List<StmtDef> stmts = stmtsDocList.getStmts(tagfilter);
assertThat(stmts).hasSize(1);
for (StmtDef stmt : stmts) {
ParsedStmt parsed = stmt.getParsed().orError();
assertThat(parsed.getBindPoints()).hasSize(2);
BindPoint rwKey = new BindPoint("rw_key", "Uniform(0,1000000000)->int; ToString() -> String");
BindPoint rwValue = new BindPoint("rw_value", "Hash(); Uniform(0,1000000000)->int; ToString() -> String");
assertThat(parsed.getBindPoints()).containsExactly(rwKey, rwValue);
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt);
Bson bsonDoc = readyMongoStatement.bind(1L);
assertThat(bsonDoc).isNotNull();
}
}
}

View File

@ -98,6 +98,11 @@
<version>3.12.117-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-mongodb</artifactId>
<version>3.12.117-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.nosqlbench</groupId>-->
@ -245,6 +250,19 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>with-mongodb</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-mongodb</artifactId>
<version>3.12.117-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>build-nb-appimage</id>
<activation>

View File

@ -44,6 +44,7 @@
<module>driver-cqlverify</module>
<module>driver-web</module>
<module>driver-kafka</module>
<module>driver-mongodb</module>
<!-- VIRTDATA MODULES -->