convert mongodb driver from ActivityType to DriverAdapter

This commit is contained in:
Jonathan Shook 2022-06-27 23:45:06 -05:00
parent 5e1898bf20
commit 5977ad7c0f
24 changed files with 349 additions and 540 deletions

View File

@ -17,7 +17,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>driver-mongodb</artifactId> <artifactId>adapter-mongodb</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<parent> <parent>
@ -29,8 +29,7 @@
<name>${project.artifactId}</name> <name>${project.artifactId}</name>
<description> <description>
An nosqlbench ActivityType (AT) driver module; An nosqlbench DriverAdapter module for MongoDB
MongoDB
</description> </description>
<dependencies> <dependencies>
@ -43,7 +42,7 @@
<dependency> <dependency>
<groupId>org.mongodb</groupId> <groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId> <artifactId>mongodb-driver-sync</artifactId>
<version>4.5.0</version> <version>4.6.1</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.mongodb.core;
import com.mongodb.ReadPreference;
import io.nosqlbench.adapter.mongodb.ops.MongoOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Map;
import java.util.function.LongFunction;
public class MongoOpDispenser extends BaseOpDispenser<Op> {
private final LongFunction<MongoOp> opFunc;
private final LongFunction<MongoOp> mongoOpF;
public MongoOpDispenser(LongFunction<MongoSpace> ctxFunc, ParsedOp op) {
super(op);
opFunc = createOpFunc(ctxFunc, op);
this.mongoOpF = createOpFunc(ctxFunc,op);
}
private LongFunction<MongoOp> createOpFunc(LongFunction<MongoSpace> ctxFunc, ParsedOp op) {
LongFunction<String> rpstring = op.getAsOptionalFunction("readPreference")
.orElseGet(() -> op.getAsOptionalFunction("read-preference")
.orElse(l -> "primary"));
LongFunction<ReadPreference> readPreferenceF = l -> ReadPreference.valueOf(rpstring.apply(l));
LongFunction<?> payload = op.getAsRequiredFunction("stmt", Object.class);
Object exampleValue = payload.apply(0);
LongFunction<Bson> bsonFunc;
if (exampleValue instanceof CharSequence cs) {
bsonFunc = l -> Document.parse(payload.apply(l).toString());
} else if ( exampleValue instanceof Map map) {
bsonFunc = l -> new Document((Map<String,Object>)payload.apply(l));
} else {
throw new RuntimeException("You must provide a String or Map for your BSON payload.");
}
LongFunction<String> databaseNamerF = op.getAsRequiredFunction("database", String.class);
return l-> new MongoOp(
ctxFunc.apply(l).getClient(),
databaseNamerF.apply(l),
bsonFunc.apply(l)
);
}
@Override
public Op apply(long cycle) {
return mongoOpF.apply(cycle);
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.mongodb.core;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import io.nosqlbench.api.NBNamedElement;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.config.standard.Param;
import org.bson.UuidRepresentation;
import org.bson.codecs.UuidCodec;
import org.bson.codecs.configuration.CodecRegistry;
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
public class MongoSpace implements NBNamedElement {
private final String name;
private final NBConfiguration cfg;
private final String connectionString;
private final MongoClient client;
private MongoDatabase mongoDatabase;
public MongoSpace(String name, NBConfiguration cfg) {
this.name = name;
this.cfg = cfg;
this.connectionString = cfg.get("connection",String.class);
this.client = createMongoClient(connectionString);
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(MongoSpace.class)
.add(Param.required("connection", String.class)
.setDescription("The connection string for your MongoDB endpoint"))
.add(Param.required("database", String.class)
.setDescription("The database name to connect to."))
.asReadOnly();
}
@Override
public String getName() {
return name;
}
public MongoClient createMongoClient(String connectionString) {
CodecRegistry codecRegistry = fromRegistries(
fromCodecs(new UuidCodec(UuidRepresentation.STANDARD)),
MongoClientSettings.getDefaultCodecRegistry()
);
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.codecRegistry(codecRegistry)
.uuidRepresentation(UuidRepresentation.STANDARD)
.build();
return MongoClients.create(settings);
}
protected MongoDatabase getDatabase() {
return mongoDatabase;
}
public MongoClient getClient() {
return this.client;
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.mongodb.core;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.Function;
/**
* Special thanks to Justin Chu who authored the original NoSQLBench MongoDB ActivityType.
*/
@Service(value=DriverAdapter.class, selector ="mongodb")
public class MongodbDriverAdapter extends BaseDriverAdapter<Op, MongoSpace> {
@Override
public OpMapper<Op> getOpMapper() {
return new MongodbOpMapper(getSpaceCache());
}
@Override
public Function<String, ? extends MongoSpace> getSpaceInitializer(NBConfiguration cfg) {
return s -> new MongoSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(MongoSpace.getConfigModel());
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.mongodb.core;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class MongodbOpMapper implements OpMapper<Op> {
private final DriverSpaceCache<? extends MongoSpace> ctxcache;
public MongodbOpMapper(DriverSpaceCache<? extends MongoSpace> ctxcache) {
this.ctxcache = ctxcache;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp op) {
LongFunction<String> ctxNamer = op.getAsFunctionOr("space","default");
LongFunction<MongoSpace> ctxFunc = l -> ctxcache.get(ctxNamer.apply(l));
return new MongoOpDispenser(ctxFunc, op);
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.mongodb.ops;
import com.mongodb.client.MongoClient;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import org.bson.Document;
import org.bson.conversions.Bson;
public class MongoOp implements CycleOp<Document> {
private final MongoClient client;
private final Bson rqBson;
private final String database;
private int resultSize;
// https://docs.mongodb.com/manual/reference/method/db.runCommand/#command-response
public MongoOp(MongoClient client, String database, Bson rqBson) {
this.client = client;
this.database = database;
this.rqBson = rqBson;
}
@Override
public Document apply(long value) {
Document document = client.getDatabase(database).runCommand(rqBson);
int ok = document.getInteger("ok",0);
if (ok!=1) {
throw new MongoOpFailedException(rqBson, ok, document);
}
this.resultSize = document.getInteger("n");
return document;
}
@Override
public long getResultSize() {
return resultSize;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.mongodb.ops;
import org.bson.Document;
import org.bson.conversions.Bson;
public class MongoOpFailedException extends RuntimeException {
private final Bson rqBson;
private final int status;
private final Document document;
public MongoOpFailedException(Bson rqBson, int status, Document document) {
this.rqBson = rqBson;
this.status = status;
this.document = document;
}
@Override
public String getMessage() {
return "Error in MongoDB response, status=" + status + ", request=" + rqBson.toString() + " response=" + document.toString() +
": " + super.getMessage();
}
}

View File

@ -1,91 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.driver.mongodb;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.concurrent.TimeUnit;
public class MongoAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(MongoAction.class);
private final MongoActivity activity;
private final int slot;
private OpSequence<ReadyMongoStatement> sequencer;
public MongoAction(MongoActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
}
@Override
public void init() {
this.sequencer = activity.getOpSequencer();
}
@Override
public int runCycle(long cycle) {
ReadyMongoStatement rms;
Bson queryBson;
try (Timer.Context bindTime = activity.bindTimer.time()) {
rms = sequencer.apply(cycle);
queryBson = rms.bind(cycle);
// Maybe show the query in log/console - only for diagnostic use
if (activity.isShowQuery()) {
logger.info("Query(cycle={}):\n{}", cycle, queryBson);
}
}
long nanoStartTime = System.nanoTime();
for (int i = 1; i <= activity.getMaxTries(); i++) {
activity.triesHisto.update(i);
try (Timer.Context resultTime = activity.resultTimer.time()) {
// assuming the commands are one of these in the doc:
// https://docs.mongodb.com/manual/reference/command/nav-crud/
Document resultDoc = activity.getDatabase().runCommand(queryBson, rms.getReadPreference());
long resultNanos = System.nanoTime() - nanoStartTime;
// TODO: perhaps collect the operationTime from the resultDoc if any
// https://docs.mongodb.com/manual/reference/method/db.runCommand/#command-response
int ok = Double.valueOf((double) resultDoc.getOrDefault("ok", 0.0d)).intValue();
if (ok == 1) {
// success
activity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
}
activity.resultSetSizeHisto.update(resultDoc.getInteger("n", 0));
return ok == 1 ? 0 : 1;
} catch (Exception e) {
logger.error("Failed to runCommand {} on cycle {}, tries {}", queryBson, cycle, i, e);
}
}
throw new RuntimeException(String.format("Exhausted max tries (%s) on cycle %s",
activity.getMaxTries(), cycle));
}
}

View File

@ -1,173 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.driver.mongodb;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.util.TagFilter;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.UuidRepresentation;
import org.bson.codecs.UuidCodec;
import org.bson.codecs.configuration.CodecRegistry;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
public class MongoActivity extends SimpleActivity implements ActivityDefObserver {
private final static Logger logger = LogManager.getLogger(MongoActivity.class);
private String yamlLoc;
private String connectionString;
private String databaseName;
private MongoClient client;
private MongoDatabase mongoDatabase;
private boolean showQuery;
private OpSequence<ReadyMongoStatement> opSequence;
Timer bindTimer;
Timer resultTimer;
Timer resultSuccessTimer;
Histogram triesHisto;
Histogram resultSetSizeHisto;
public MongoActivity(ActivityDef activityDef) {
super(activityDef);
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
// sanity check
yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload")
.orElseThrow(() -> new IllegalArgumentException("yaml is not defined"));
connectionString = activityDef.getParams().getOptionalString("connection")
.orElseThrow(() -> new IllegalArgumentException("connection is not defined"));
// TODO: support multiple databases
databaseName = activityDef.getParams().getOptionalString("database")
.orElseThrow(() -> new IllegalArgumentException("database is not defined"));
}
@Override
public void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
onActivityDefUpdate(activityDef);
opSequence = initOpSequencer();
setDefaultsFromOpSequence(opSequence);
client = createMongoClient(connectionString);
mongoDatabase = client.getDatabase(databaseName);
showQuery = activityDef.getParams().getOptionalBoolean("showquery")
.orElse(false);
bindTimer = ActivityMetrics.timer(activityDef, "bind", this.getHdrDigits());
resultTimer = ActivityMetrics.timer(activityDef, "result", this.getHdrDigits());
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success", this.getHdrDigits());
resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size", this.getHdrDigits());
triesHisto = ActivityMetrics.histogram(activityDef, "tries", this.getHdrDigits());
}
@Override
public void shutdownActivity() {
logger.debug("shutting down activity: " + this.activityDef.getAlias());
if (client != null) {
client.close();
}
}
OpSequence<ReadyMongoStatement> initOpSequencer() {
SequencerType sequencerType = SequencerType.valueOf(
activityDef.getParams().getOptionalString("seq").orElse("bucket")
);
SequencePlanner<ReadyMongoStatement> sequencer = new SequencePlanner<>(sequencerType);
StmtsDocList stmtsDocList = StatementsLoader.loadPath(
logger,
yamlLoc,
activityDef.getParams(),
"activities"
);
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
TagFilter tagFilter = new TagFilter(tagfilter);
stmtsDocList.getStmts().stream().map(tagFilter::matchesTaggedResult).forEach(r -> logger.info(r.getLog()));
List<OpTemplate> stmts = stmtsDocList.getStmts(tagfilter);
if (stmts.isEmpty()) {
logger.error("No statements found for this activity");
} else {
for (OpTemplate stmt : stmts) {
ParsedTemplate parsed = stmt.getParsed().orElseThrow();
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
sequencer.addOp(new ReadyMongoStatement(stmt), stmt.getParamOrDefault("ratio", 1));
}
}
return sequencer.resolve();
}
MongoClient createMongoClient(String connectionString) {
CodecRegistry codecRegistry = fromRegistries(fromCodecs(new UuidCodec(UuidRepresentation.STANDARD)),
MongoClientSettings.getDefaultCodecRegistry());
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.codecRegistry(codecRegistry)
.uuidRepresentation(UuidRepresentation.STANDARD)
.build();
return MongoClients.create(settings);
}
protected MongoDatabase getDatabase() {
return mongoDatabase;
}
protected OpSequence<ReadyMongoStatement> getOpSequencer() {
return opSequence;
}
protected boolean isShowQuery() {
return showQuery;
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.driver.mongodb;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class, selector = "mongodb")
public class MongoActivityType implements ActivityType<MongoActivity> {
@Override
public MongoActivity getActivity(ActivityDef activityDef) {
return new MongoActivity(activityDef);
}
@Override
public ActionDispenser getActionDispenser(MongoActivity activity) {
return new MongoActionDispenser(activity);
}
private static class MongoActionDispenser implements ActionDispenser {
private final MongoActivity activity;
public MongoActionDispenser(MongoActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new MongoAction(activity, slot);
}
}
}

View File

@ -1,51 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.driver.mongodb;
import com.mongodb.ReadPreference;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
import org.bson.Document;
import org.bson.conversions.Bson;
public class ReadyMongoStatement {
private final StringBindings bindings;
private final ReadPreference readPreference;
public ReadyMongoStatement(OpTemplate stmtDef) {
ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt().orElseThrow(), stmtDef.getBindings());
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt().orElseThrow(), paramBindings);
this.bindings = template.resolve();
this.readPreference = stmtDef.getOptionalStringParam("readPreference")
.map(ReadPreference::valueOf)
.orElse(ReadPreference.primary());
}
public ReadPreference getReadPreference() {
return readPreference;
}
public Bson bind(long value) {
return Document.parse(bindings.bind(value));
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.driver.mongodb;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import static org.assertj.core.api.Assertions.assertThat;
public class MongoActivityTest {
private ActivityDef activityDef;
@BeforeEach
public void setup() {
String[] params = {
"yaml=activities/mongodb-basic.yaml",
"connection=mongodb://127.0.0.1",
"database=nosqlbench_testdb"
};
activityDef = ActivityDef.parseActivityDef(String.join(";", params));
}
@Test
public void testInitOpSequencer() {
MongoActivity mongoActivity = new MongoActivity(activityDef);
mongoActivity.initActivity();
OpSequence<ReadyMongoStatement> sequence = mongoActivity.initOpSequencer();
assertThat(sequence.getOps()).hasSize(3);
}
}

View File

@ -1,121 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.driver.mongodb;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.virtdata.core.templates.BindPoint;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
public class ReadyMongoStatementTest {
private final static Logger logger = LogManager.getLogger(ReadyMongoStatementTest.class);
private ActivityDef activityDef;
private StmtsDocList stmtsDocList;
@BeforeEach
public void setup() {
String[] params = {
"yaml=activities/mongodb-basic.yaml",
"database=nosqlbench_testdb",
};
activityDef = ActivityDef.parseActivityDef(String.join(";", params));
String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default");
stmtsDocList = StatementsLoader.loadPath(logger, yaml_loc, activityDef.getParams(), "activities");
}
@Test
public void testResolvePhaseRampup() {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:rampup");
List<OpTemplate> stmts = stmtsDocList.getStmts(tagfilter);
assertThat(stmts).hasSize(1);
for (OpTemplate stmt : stmts) {
ParsedTemplate parsed = stmt.getParsed().orElseThrow();
assertThat(parsed.getBindPoints()).hasSize(2);
BindPoint seqKey = new BindPoint("seq_key", "Mod(1000000L); ToInt()");
BindPoint seqValue = new BindPoint("seq_value", "Mod(1000000000L); Hash(); ToString() -> String");
assertThat(parsed.getBindPoints()).containsExactly(seqKey, seqValue);
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt);
Bson bsonDoc = readyMongoStatement.bind(1L);
assertThat(bsonDoc).isNotNull();
}
}
@Test
public void testResolvePhaseMainRead() {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:main,name:.*main-find");
List<OpTemplate> stmts = stmtsDocList.getStmts(tagfilter);
assertThat(stmts).hasSize(1);
for (OpTemplate stmt : stmts) {
ParsedTemplate parsed = stmt.getParsed().orElseThrow();
assertThat(parsed.getBindPoints()).hasSize(1);
BindPoint rwKey = new BindPoint("rw_key", "Uniform(0,1000000)->long; ToInt()");
assertThat(parsed.getBindPoints()).containsExactly(rwKey);
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt);
Bson bsonDoc = readyMongoStatement.bind(1L);
assertThat(bsonDoc).isNotNull();
}
}
@Test
public void testResolvePhaseMainWrite() {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("phase:main,name:.*main-insert");
List<OpTemplate> stmts = stmtsDocList.getStmts(tagfilter);
assertThat(stmts).hasSize(1);
for (OpTemplate stmt : stmts) {
ParsedTemplate parsed = stmt.getParsed().orElseThrow();
assertThat(parsed.getBindPoints()).hasSize(2);
BindPoint rwKey = new BindPoint("rw_key", "Uniform(0,1000000)->long; ToInt()");
BindPoint rwValue = new BindPoint("rw_value", "Uniform(0,1000000000)->int; Hash(); ToString() -> String");
assertThat(parsed.getBindPoints()).containsExactly(rwKey, rwValue);
String statement = parsed.getPositionalStatement(Function.identity());
Objects.requireNonNull(statement);
ReadyMongoStatement readyMongoStatement = new ReadyMongoStatement(stmt);
Bson bsonDoc = readyMongoStatement.bind(1L);
assertThat(bsonDoc).isNotNull();
}
}
}

View File

@ -70,6 +70,12 @@
<version>4.17.15-SNAPSHOT</version> <version>4.17.15-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-mongodb</artifactId>
<version>4.17.15-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>io.nosqlbench</groupId> <groupId>io.nosqlbench</groupId>
<artifactId>adapter-http</artifactId> <artifactId>adapter-http</artifactId>

View File

@ -59,6 +59,7 @@
<module>adapter-dynamodb</module> <module>adapter-dynamodb</module>
<module>adapter-diag</module> <module>adapter-diag</module>
<module>adapter-stdout</module> <module>adapter-stdout</module>
<module>adapter-mongodb</module>
<!-- VIRTDATA MODULES --> <!-- VIRTDATA MODULES -->