From 9cf6cd9953b045e0a33422e9c2107d7aa3c308f0 Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Mon, 18 May 2020 12:33:08 -0400 Subject: [PATCH] Add MongoDB support The Basics - insert - find --- driver-mongodb/pom.xml | 49 +++++++ .../driver/mongodb/MongoAction.java | 64 ++++++++ .../driver/mongodb/MongoActivity.java | 137 ++++++++++++++++++ .../driver/mongodb/MongoActivityType.java | 41 ++++++ .../driver/mongodb/ReadyMongoStatement.java | 34 +++++ .../resources/activities/mongodb-basic.yaml | 57 ++++++++ driver-mongodb/src/main/resources/mongodb.md | 3 + .../driver/mongodb/MongoActivityTest.java | 26 ++++ .../mongodb/ReadyMongoStatementTest.java | 107 ++++++++++++++ nb/pom.xml | 18 +++ pom.xml | 1 + 11 files changed, 537 insertions(+) create mode 100644 driver-mongodb/pom.xml create mode 100644 driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoAction.java create mode 100644 driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivity.java create mode 100644 driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivityType.java create mode 100644 driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/ReadyMongoStatement.java create mode 100644 driver-mongodb/src/main/resources/activities/mongodb-basic.yaml create mode 100644 driver-mongodb/src/main/resources/mongodb.md create mode 100644 driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/MongoActivityTest.java create mode 100644 driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/ReadyMongoStatementTest.java diff --git a/driver-mongodb/pom.xml b/driver-mongodb/pom.xml new file mode 100644 index 000000000..634c21e29 --- /dev/null +++ b/driver-mongodb/pom.xml @@ -0,0 +1,49 @@ + + 4.0.0 + + driver-mongodb + jar + + + mvn-defaults + io.nosqlbench + 3.12.117-SNAPSHOT + ../mvn-defaults + + + ${project.artifactId} + + An nosqlbench ActivityType (AT) driver module; + MongoDB + + + + + io.nosqlbench + engine-api + 3.12.117-SNAPSHOT + + + + org.mongodb + mongo-java-driver + 3.12.4 + + + + + + org.assertj + assertj-core + test + + + + junit + junit + test + + + + + diff --git a/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoAction.java b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoAction.java new file mode 100644 index 000000000..d6242889b --- /dev/null +++ b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoAction.java @@ -0,0 +1,64 @@ +package io.nosqlbench.driver.mongodb; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Timer; +import com.mongodb.ReadPreference; +import com.mongodb.client.MongoDatabase; +import io.nosqlbench.engine.api.activityapi.core.SyncAction; +import io.nosqlbench.engine.api.activityapi.planning.OpSequence; +import org.bson.Document; +import org.bson.conversions.Bson; + +public class MongoAction implements SyncAction { + + private final static Logger logger = LoggerFactory.getLogger(MongoAction.class); + + private final MongoActivity activity; + private final int slot; + + private OpSequence sequencer; + + public MongoAction(MongoActivity activity, int slot) { + this.activity = activity; + this.slot = slot; + } + + @Override + public void init() { + this.sequencer = activity.getOpSequencer(); + } + + @Override + public int runCycle(long cycleValue) { + ReadyMongoStatement rms; + Bson queryBson; + try (Timer.Context bindTime = activity.bindTimer.time()) { + rms = sequencer.get(cycleValue); + queryBson = rms.bind(cycleValue); + + if (activity.isShowQuery()) { + logger.info("Query(cycle={}):\n{}", cycleValue, queryBson); + } + } + + for (int i = 0; i < activity.getMaxTries(); i++) { + activity.triesHisto.update(i); + try (Timer.Context executeTime = activity.executeTimer.time()) { + MongoDatabase database = activity.getDatabase(); + ReadPreference readPreference = rms.getReadPreference(); + Document resultDoc = database.runCommand(queryBson, readPreference); + + double ok = resultDoc.getDouble("ok"); + activity.resultSetSizeHisto.update(resultDoc.getInteger("n", 0)); + return ok == 1.0d ? 0 : 1; + } catch (Exception e) { + logger.error("Failed to runCommand {} on cycle {}, tries {}", queryBson, cycleValue, i, e); + } + } + + throw new RuntimeException(String.format("Exhausted max tries (%s) on cycle %s", + cycleValue, activity.getMaxTries())); + } +} diff --git a/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivity.java b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivity.java new file mode 100644 index 000000000..c65cd14dc --- /dev/null +++ b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivity.java @@ -0,0 +1,137 @@ +package io.nosqlbench.driver.mongodb; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; +import io.nosqlbench.engine.api.activityapi.planning.OpSequence; +import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner; +import io.nosqlbench.engine.api.activityapi.planning.SequencerType; +import io.nosqlbench.engine.api.activityconfig.ParsedStmt; +import io.nosqlbench.engine.api.activityconfig.StatementsLoader; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList; +import io.nosqlbench.engine.api.activityimpl.ActivityDef; +import io.nosqlbench.engine.api.activityimpl.SimpleActivity; +import io.nosqlbench.engine.api.metrics.ActivityMetrics; +import io.nosqlbench.engine.api.templating.StrInterpolator; +import io.nosqlbench.engine.api.util.TagFilter; + +public class MongoActivity extends SimpleActivity implements ActivityDefObserver { + + private final static Logger logger = LoggerFactory.getLogger(MongoActivity.class); + + private String yamlLoc; + private String connectionString; + private String databaseName; + + private MongoClient client; + private MongoDatabase mongoDatabase; + private boolean showQuery; + private int maxTries; + + private OpSequence opSequence; + + protected Timer bindTimer; + protected Timer executeTimer; + protected Histogram resultSetSizeHisto; + protected Histogram triesHisto; + + public MongoActivity(ActivityDef activityDef) { + super(activityDef); + } + + @Override + public synchronized void onActivityDefUpdate(ActivityDef activityDef) { + super.onActivityDefUpdate(activityDef); + + // sanity check + yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload") + .orElseThrow(() -> new IllegalArgumentException("yaml is not defined")); + connectionString = activityDef.getParams().getOptionalString("connection") + .orElseThrow(() -> new IllegalArgumentException("connection is not defined")); + // TODO: support multiple databases + databaseName = activityDef.getParams().getOptionalString("database") + .orElseThrow(() -> new IllegalArgumentException("database is not defined")); + } + + @Override + public void initActivity() { + logger.debug("initializing activity: " + this.activityDef.getAlias()); + onActivityDefUpdate(activityDef); + + opSequence = initOpSequencer(); + setDefaultsFromOpSequence(opSequence); + + client = MongoClients.create(connectionString); + mongoDatabase = client.getDatabase(databaseName); + showQuery = activityDef.getParams().getOptionalBoolean("showquery") + .orElse(false); + maxTries = activityDef.getParams().getOptionalInteger("maxtries") + .orElse(10); + + bindTimer = ActivityMetrics.timer(activityDef, "bind"); + executeTimer = ActivityMetrics.timer(activityDef, "execute"); + resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size"); + triesHisto = ActivityMetrics.histogram(activityDef, "tries"); + } + + @Override + public void shutdownActivity() { + logger.debug("shutting down activity: " + this.activityDef.getAlias()); + if (client != null) { + client.close(); + } + } + + OpSequence initOpSequencer() { + SequencerType sequencerType = SequencerType.valueOf( + activityDef.getParams().getOptionalString("seq").orElse("bucket") + ); + SequencePlanner sequencer = new SequencePlanner<>(sequencerType); + + StmtsDocList stmtsDocList = StatementsLoader.load(logger, yamlLoc, new StrInterpolator(activityDef), "activities"); + + String tagfilter = activityDef.getParams().getOptionalString("tags").orElse(""); + + TagFilter tagFilter = new TagFilter(tagfilter); + stmtsDocList.getStmts().stream().map(tagFilter::matchesTaggedResult).forEach(r -> logger.info(r.getLog())); + + List stmts = stmtsDocList.getStmts(tagfilter); + for (StmtDef stmt : stmts) { + ParsedStmt parsed = stmt.getParsed().orError(); + String statement = parsed.getPositionalStatement(Function.identity()); + Objects.requireNonNull(statement); + + sequencer.addOp(new ReadyMongoStatement(stmt), + Long.parseLong(stmt.getParams().getOrDefault("ratio","1"))); + } + + return sequencer.resolve(); + } + + protected MongoDatabase getDatabase() { + return mongoDatabase; + } + + protected OpSequence getOpSequencer() { + return opSequence; + } + + protected boolean isShowQuery() { + return showQuery; + } + + protected int getMaxTries() { + return maxTries; + } +} diff --git a/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivityType.java b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivityType.java new file mode 100644 index 000000000..b37f60c02 --- /dev/null +++ b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/MongoActivityType.java @@ -0,0 +1,41 @@ +package io.nosqlbench.driver.mongodb; + +import io.nosqlbench.engine.api.activityapi.core.Action; +import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; +import io.nosqlbench.engine.api.activityapi.core.ActivityType; +import io.nosqlbench.engine.api.activityimpl.ActivityDef; +import io.nosqlbench.nb.annotations.Service; + +@Service(ActivityType.class) +public class MongoActivityType implements ActivityType { + + @Override + public String getName() { + return "mongodb"; + } + + @Override + public MongoActivity getActivity(ActivityDef activityDef) { + return new MongoActivity(activityDef); + } + + @Override + public ActionDispenser getActionDispenser(MongoActivity activity) { + return new MongoActionDispenser(activity); + } + + private static class MongoActionDispenser implements ActionDispenser { + + private final MongoActivity activity; + + public MongoActionDispenser(MongoActivity activity) + { + this.activity = activity; + } + + @Override + public Action getAction(int slot) { + return new MongoAction(activity, slot); + } + } +} diff --git a/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/ReadyMongoStatement.java b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/ReadyMongoStatement.java new file mode 100644 index 000000000..9c974d51e --- /dev/null +++ b/driver-mongodb/src/main/java/io/nosqlbench/driver/mongodb/ReadyMongoStatement.java @@ -0,0 +1,34 @@ +package io.nosqlbench.driver.mongodb; + +import com.mongodb.ReadPreference; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef; +import io.nosqlbench.virtdata.core.bindings.BindingsTemplate; +import io.nosqlbench.virtdata.core.templates.ParsedTemplate; +import io.nosqlbench.virtdata.core.templates.StringBindings; +import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate; +import org.bson.Document; +import org.bson.conversions.Bson; + +public class ReadyMongoStatement { + + private StringBindings bindings; + private ReadPreference readPreference; + + public ReadyMongoStatement(StmtDef stmtDef) { + ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt(), stmtDef.getBindings()); + BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints()); + StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt(), paramBindings); + + this.bindings = template.resolve(); + this.readPreference = ReadPreference.valueOf(stmtDef.getParams() + .getOrDefault("readPreference","primary")); + } + + public ReadPreference getReadPreference() { + return readPreference; + } + + public Bson bind(long value) { + return Document.parse(bindings.bind(value)); + } +} diff --git a/driver-mongodb/src/main/resources/activities/mongodb-basic.yaml b/driver-mongodb/src/main/resources/activities/mongodb-basic.yaml new file mode 100644 index 000000000..4ba58dbd2 --- /dev/null +++ b/driver-mongodb/src/main/resources/activities/mongodb-basic.yaml @@ -0,0 +1,57 @@ +# nb -v run driver=mongodb yaml=mongodb-basic connection=mongodb://127.0.0.1 database=testdb tags=phase:rampup +scenarios: + default: + - run driver=mongodb tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto + - run driver=mongodb tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto +bindings: + seq_key: Mod(<>); ToString() -> String + seq_value: Hash(); Mod(<>); ToString() -> String + rw_key: <int>>; ToString() -> String + rw_value: Hash(); <int>>; ToString() -> String + +blocks: + - name: rampup + tags: + phase: rampup + statements: + - rampup-insert: | + { + insert: "<>", + documents: [ { key: {seq_key}, + value: {seq_value} } ] + } + params: + readPreference: primary + tags: + name: rampup-insert + - name: main-read + tags: + phase: main + type: read + params: + ratio: <> + statements: + - main-find: | + { + find: "<>", + filter: { key: {rw_key} } + } + params: + readPreference: primary + tags: + name: main-find + - name: main-write + tags: + phase: main + type: write + params: + ratio: <> + statements: + - main-insert: | + { + insert: "<>", + documents: [ { key: {rw_key}, + value: {rw_value} } ] + } + tags: + name: main-insert diff --git a/driver-mongodb/src/main/resources/mongodb.md b/driver-mongodb/src/main/resources/mongodb.md new file mode 100644 index 000000000..b5f48dfdd --- /dev/null +++ b/driver-mongodb/src/main/resources/mongodb.md @@ -0,0 +1,3 @@ +# MongoDB Driver + +This is a driver for MongoDB. \ No newline at end of file diff --git a/driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/MongoActivityTest.java b/driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/MongoActivityTest.java new file mode 100644 index 000000000..1c37057e5 --- /dev/null +++ b/driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/MongoActivityTest.java @@ -0,0 +1,26 @@ +package io.nosqlbench.driver.mongodb; + +import org.junit.Test; + +import io.nosqlbench.engine.api.activityapi.planning.OpSequence; +import io.nosqlbench.engine.api.activityimpl.ActivityDef; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MongoActivityTest { + + @Test + public void testInitOpSequencer() { + String[] params = { + "yaml=activities/mongodb-basic.yaml", + "connection=mongodb://127.0.0.1", + "database=nosqlbench_testdb" + }; + ActivityDef activityDef = ActivityDef.parseActivityDef(String.join(";", params)); + MongoActivity mongoActivity = new MongoActivity(activityDef); + mongoActivity.initActivity(); + + OpSequence sequence = mongoActivity.initOpSequencer(); + assertThat(sequence.getOps()).hasSize(3); + } +} diff --git a/driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/ReadyMongoStatementTest.java b/driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/ReadyMongoStatementTest.java new file mode 100644 index 000000000..7c1acde25 --- /dev/null +++ b/driver-mongodb/src/test/java/io/nosqlbench/driver/mongodb/ReadyMongoStatementTest.java @@ -0,0 +1,107 @@ +package io.nosqlbench.driver.mongodb; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.nosqlbench.engine.api.activityconfig.ParsedStmt; +import io.nosqlbench.engine.api.activityconfig.StatementsLoader; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList; +import io.nosqlbench.engine.api.activityimpl.ActivityDef; +import io.nosqlbench.engine.api.templating.StrInterpolator; +import io.nosqlbench.virtdata.core.templates.BindPoint; +import org.bson.conversions.Bson; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ReadyMongoStatementTest { + private final static Logger logger = LoggerFactory.getLogger(ReadyMongoStatementTest.class); + + private ActivityDef activityDef; + private StmtsDocList stmtsDocList; + + @Before + public void setup() { + String[] params = { + "yaml=activities/mongodb-basic.yaml", + "database=nosqlbench_testdb", + }; + activityDef = ActivityDef.parseActivityDef(String.join(";", params)); + String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default"); + stmtsDocList = StatementsLoader.load(logger, yaml_loc, new StrInterpolator(activityDef), "activities"); + } + + @Test + public void testResolvePhaseRampup() { + String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:rampup"); + + List stmts = stmtsDocList.getStmts(tagfilter); + assertThat(stmts).hasSize(1); + for (StmtDef stmt : stmts) { + ParsedStmt parsed = stmt.getParsed().orError(); + assertThat(parsed.getBindPoints()).hasSize(2); + + BindPoint seqKey = new BindPoint("seq_key", "Mod(1000000000); ToString() -> String"); + BindPoint seqValue = new BindPoint("seq_value", "Hash(); Mod(1000000000); ToString() -> String"); + assertThat(parsed.getBindPoints()).containsExactly(seqKey, seqValue); + + String statement = parsed.getPositionalStatement(Function.identity()); + Objects.requireNonNull(statement); + + ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt); + Bson bsonDoc = readyMongoStatement.bind(1L); + assertThat(bsonDoc).isNotNull(); + } + } + + @Test + public void testResolvePhaseMainRead() { + String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:main,name:main-find"); + + List stmts = stmtsDocList.getStmts(tagfilter); + assertThat(stmts).hasSize(1); + for (StmtDef stmt : stmts) { + ParsedStmt parsed = stmt.getParsed().orError(); + assertThat(parsed.getBindPoints()).hasSize(1); + + BindPoint rwKey = new BindPoint("rw_key", "Uniform(0,1000000000)->int; ToString() -> String"); + assertThat(parsed.getBindPoints()).containsExactly(rwKey); + + String statement = parsed.getPositionalStatement(Function.identity()); + Objects.requireNonNull(statement); + + ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt); + Bson bsonDoc = readyMongoStatement.bind(1L); + assertThat(bsonDoc).isNotNull(); + } + } + + @Test + public void testResolvePhaseMainWrite() { + String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:main,name:main-insert"); + + List stmts = stmtsDocList.getStmts(tagfilter); + assertThat(stmts).hasSize(1); + for (StmtDef stmt : stmts) { + ParsedStmt parsed = stmt.getParsed().orError(); + assertThat(parsed.getBindPoints()).hasSize(2); + + BindPoint rwKey = new BindPoint("rw_key", "Uniform(0,1000000000)->int; ToString() -> String"); + BindPoint rwValue = new BindPoint("rw_value", "Hash(); Uniform(0,1000000000)->int; ToString() -> String"); + assertThat(parsed.getBindPoints()).containsExactly(rwKey, rwValue); + + String statement = parsed.getPositionalStatement(Function.identity()); + Objects.requireNonNull(statement); + + ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt); + Bson bsonDoc = readyMongoStatement.bind(1L); + assertThat(bsonDoc).isNotNull(); + } + } +} diff --git a/nb/pom.xml b/nb/pom.xml index e275e78b1..95de0d48f 100644 --- a/nb/pom.xml +++ b/nb/pom.xml @@ -98,6 +98,11 @@ 3.12.117-SNAPSHOT + + io.nosqlbench + driver-mongodb + 3.12.117-SNAPSHOT + @@ -245,6 +250,19 @@ + + with-mongodb + + false + + + + io.nosqlbench + driver-mongodb + 3.12.117-SNAPSHOT + + + build-nb-appimage diff --git a/pom.xml b/pom.xml index 9331801ad..887885208 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ driver-cqlverify driver-web driver-kafka + driver-mongodb