implement scripting in client to support fluent API usage with generated values

This commit is contained in:
Jonathan Shook
2022-01-13 23:20:25 -06:00
parent 37a0954fa8
commit 52c892a837
22 changed files with 247 additions and 29 deletions

View File

@@ -19,6 +19,11 @@
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.9</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>

View File

@@ -1,8 +1,8 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -16,7 +16,7 @@ public class CqlDriverAdapterStub extends Cqld4DriverAdapter {
}
@Override
public OpMapper<Cqld4BaseOp> getOpMapper() {
public OpMapper<Op> getOpMapper() {
logger.warn("This version of NoSQLBench uses the DataStax Java Driver version 4 for all CQL workloads. In this preview version, advanced testing features present in the previous cql and cqld3 drivers are being back-ported. If you need those features, please use only the release artifacts. To suppress this message, use driver=cqld4. This warning will be removed in a future version when all features are completely back-ported.");
return super.getOpMapper();
}

View File

@@ -1,8 +1,8 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -16,7 +16,7 @@ public class Cqld3DriverAdapterStub extends Cqld4DriverAdapter {
}
@Override
public OpMapper<Cqld4BaseOp> getOpMapper() {
public OpMapper<Op> getOpMapper() {
logger.warn("This version of NoSQLBench uses the DataStax Java Driver version 4 for all CQL workloads. In this preview version, advanced testing features present in the previous cql and cqld3 drivers are being back-ported. If you need those features, please use only the release artifacts. To suppress this message, use driver=cqld4. This warning will be removed in a future version when all features are completely back-ported.");
return super.getOpMapper();
}

View File

@@ -1,11 +1,11 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.adapter.cqld4.opmappers.Cqld4OpMapper;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@@ -13,10 +13,10 @@ import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "cqld4")
public class Cqld4DriverAdapter extends BaseDriverAdapter<Cqld4BaseOp, Cqld4Space> {
public class Cqld4DriverAdapter extends BaseDriverAdapter<Op, Cqld4Space> {
@Override
public OpMapper<Cqld4BaseOp> getOpMapper() {
public OpMapper<Op> getOpMapper() {
DriverSpaceCache<? extends Cqld4Space> spaceCache = getSpaceCache();
NBConfiguration config = getConfiguration();
return new Cqld4OpMapper(config, spaceCache);

View File

@@ -3,6 +3,9 @@ package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.internal.core.config.composite.CompositeDriverConfigLoader;
import io.nosqlbench.engine.api.util.SSLKsFactory;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.content.Content;
@@ -36,7 +39,13 @@ public class Cqld4Space {
private CqlSession createSession(NBConfiguration cfg) {
CqlSessionBuilder builder = new CqlSessionBuilder();
resolveConfigLoader(cfg).ifPresent(builder::withConfigLoader);
OptionsMap defaults = new OptionsMap();
defaults.put(TypedDriverOption.MONITOR_REPORTING_ENABLED, false);
DriverConfigLoader driverConfigLoader = DriverConfigLoader.fromMap(defaults);
DriverConfigLoader mainCfgLoader = resolveConfigLoader(cfg).orElse(DriverConfigLoader.fromMap(OptionsMap.driverDefaults()));
driverConfigLoader = new CompositeDriverConfigLoader(driverConfigLoader,mainCfgLoader);
builder.withConfigLoader(driverConfigLoader);
int port = cfg.getOrDefault("port", 9042);

View File

@@ -2,5 +2,6 @@ package io.nosqlbench.adapter.cqld4.opmappers;
public enum CqlD4OpType {
cql,
gremlin
gremlin,
fluent
}

View File

@@ -0,0 +1,53 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatementBuilder;
import com.datastax.oss.driver.api.core.CqlSession;
import groovy.lang.Script;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4FluentGraphOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.bindings.Bindings;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class Cqld4FluentGraphOpDispenser extends BaseOpDispenser<Op> {
private final LongFunction<? extends String> graphnameFunc;
private final CqlSession session;
private final Bindings virtdataBindings;
private final ThreadLocal<Script> tlScript;
public Cqld4FluentGraphOpDispenser(
ParsedOp optpl,
LongFunction<? extends String> graphnameFunc,
CqlSession session,
Bindings virtdataBindings,
Supplier<Script> scriptSource
) {
super(optpl);
this.graphnameFunc = graphnameFunc;
this.session = session;
this.virtdataBindings = virtdataBindings;
this.tlScript = ThreadLocal.withInitial(scriptSource);
}
@Override
public Op apply(long value) {
String graphname = graphnameFunc.apply(value);
Script script = tlScript.get();
Map<String, Object> allMap = virtdataBindings.getAllMap(value);
allMap.forEach((k,v) -> script.getBinding().setVariable(k,v));
GraphTraversal<Vertex,Vertex> v = (GraphTraversal<Vertex, Vertex>) script.run();
FluentGraphStatement fgs = new FluentGraphStatementBuilder(v).setGraphName(graphname).build();
return new Cqld4FluentGraphOp(session,fgs);
}
}

View File

@@ -0,0 +1,50 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.dse.driver.api.core.graph.DseGraph;
import com.datastax.oss.driver.api.core.CqlSession;
import groovy.lang.Binding;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.bindings.Bindings;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class Cqld4FluentGraphOpMapper implements OpMapper<Op> {
private final CqlSession session;
public Cqld4FluentGraphOpMapper(CqlSession session) {
this.session = session;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp cmd) {
GraphTraversalSource g = DseGraph.g;
ParsedTemplate fluent = cmd.getAsTemplate("fluent").orElseThrow();
String scriptBodyWithRawVarRefs = fluent.getPositionalStatement();
Supplier<Script> supplier = () -> {
groovy.lang.Binding groovyBindings = new Binding(new LinkedHashMap<String,Object>(Map.of("g",g)));
GroovyShell gshell = new GroovyShell(groovyBindings);
return gshell.parse(scriptBodyWithRawVarRefs);
};
LongFunction<? extends String> graphnameFunc = cmd.getAsRequiredFunction("graphname");
Bindings virtdataBindings = new BindingsTemplate(fluent.getBindPoints()).resolveBindings();
// Map<String, Object> values = virtdataBindings.getAllMap(1L);
// values.forEach(groovyBindings::setVariable);
// GraphTraversal<Vertex,Vertex> v = (GraphTraversal<Vertex, Vertex>) parsed.run();
// FluentGraphStatement fgs = new FluentGraphStatementBuilder(v).setGraphName("graph_wheels").build();
return new Cqld4FluentGraphOpDispenser(cmd, graphnameFunc, session, virtdataBindings, supplier);
}
}

View File

@@ -1,18 +1,19 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4ScriptGraphOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
public class Cqld4GremlinOpMapper implements OpMapper<Cqld4GremlinOp> {
public class Cqld4GremlinOpMapper implements OpMapper<Cqld4ScriptGraphOp> {
private final CqlSession session;
public Cqld4GremlinOpMapper(CqlSession session) {
this.session = session;
}
public OpDispenser<Cqld4GremlinOp> apply(ParsedOp cmd) {
public OpDispenser<Cqld4ScriptGraphOp> apply(ParsedOp cmd) {
return new GremlinOpDispenser(session, cmd);
}
}

View File

@@ -2,14 +2,14 @@ package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Space;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
public class Cqld4OpMapper implements OpMapper<Cqld4BaseOp> {
public class Cqld4OpMapper implements OpMapper<Op> {
private final DriverSpaceCache<? extends Cqld4Space> cache;
@@ -30,7 +30,7 @@ public class Cqld4OpMapper implements OpMapper<Cqld4BaseOp> {
* which ones are static and dynamic.
* @return An op dispenser for each provided op command
*/
public OpDispenser<? extends Cqld4BaseOp> apply(ParsedOp cmd) {
public OpDispenser<? extends Op> apply(ParsedOp cmd) {
Cqld4Space cqld4Space = cache.get(cmd.getStaticConfigOr("space", "default"));
CqlSession session = cqld4Space.getSession();
@@ -42,6 +42,7 @@ public class Cqld4OpMapper implements OpMapper<Cqld4BaseOp> {
return switch (cmdtype) {
case cql -> new CqlD4CqlOpMapper(session).apply(cmd);
case gremlin -> new Cqld4GremlinOpMapper(session).apply(cmd);
case fluent -> new Cqld4FluentGraphOpMapper(session).apply(cmd);
};
}

View File

@@ -3,13 +3,14 @@ package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatementBuilder;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4ScriptGraphOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.Optional;
import java.util.function.LongFunction;
public class GremlinOpDispenser extends BaseOpDispenser<Cqld4GremlinOp> {
public class GremlinOpDispenser extends BaseOpDispenser<Cqld4ScriptGraphOp> {
private final LongFunction<? extends ScriptGraphStatement> stmtFunc;
private final CqlSession session;
@@ -62,12 +63,12 @@ public class GremlinOpDispenser extends BaseOpDispenser<Cqld4GremlinOp> {
}
@Override
public Cqld4GremlinOp apply(long value) {
public Cqld4ScriptGraphOp apply(long value) {
ScriptGraphStatement stmt = stmtFunc.apply(value);
if (diagFunc.apply(value)>0L) {
System.out.println("## GREMLIN DIAG: ScriptGraphStatement on graphname(" + stmt.getGraphName() + "):\n" + stmt.getScript());
}
return new Cqld4GremlinOp(session, stmt);
return new Cqld4ScriptGraphOp(session, stmt);
}
}

View File

@@ -1,6 +0,0 @@
package io.nosqlbench.adapter.cqld4.optypes;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
public abstract class Cqld4BaseOp implements Op {
}

View File

@@ -32,7 +32,7 @@ import java.util.Map;
// TODO: add rows histogram resultSetSizeHisto
public abstract class Cqld4CqlOp extends Cqld4BaseOp implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
private final CqlSession session;
private final int maxpages;

View File

@@ -0,0 +1,29 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.dse.driver.api.core.graph.GraphResultSet;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public class Cqld4FluentGraphOp implements CycleOp<GraphResultSet> {
private final CqlSession session;
private final FluentGraphStatement stmt;
private int resultSize=0;
public Cqld4FluentGraphOp(CqlSession session, FluentGraphStatement stmt) {
this.session = session;
this.stmt = stmt;
}
@Override
public GraphResultSet apply(long value) {
GraphResultSet result = session.execute(stmt);
this.resultSize = result.all().size();
return result;
}
@Override
public long getResultSize() {
return resultSize;
}
}

View File

@@ -1,17 +1,16 @@
package io.nosqlbench.adapter.cqld4.opmappers;
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.dse.driver.api.core.graph.GraphResultSet;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public class Cqld4GremlinOp extends Cqld4BaseOp implements CycleOp<GraphResultSet> {
public class Cqld4ScriptGraphOp implements CycleOp<GraphResultSet> {
private final CqlSession session;
private final ScriptGraphStatement stmt;
private int resultSize=0;
public Cqld4GremlinOp(CqlSession session, ScriptGraphStatement stmt) {
public Cqld4ScriptGraphOp(CqlSession session, ScriptGraphStatement stmt) {
this.session = session;
this.stmt = stmt;
}

View File

@@ -8,6 +8,7 @@ scenarios:
schema: run driver=cqld4 graphname=graph_wheels tags=phase:graph-schema cycles===UNDEF
disable-verify: run driver=cqld4 graphname=graph_wheels tags=phase:disable-verify cycles===UNDEF
rampup: run driver==cqld4 graphname=graph_wheels tags=phase:rampup cycles=1000
fluent: run driver=cqld4 graphname=graph_wheels tags=block:fluent cycles=10
default:
creategraph: run driver=cqld4 graphname=graph_wheels tags=phase:create-graph cycles===UNDEF
schema: run driver=cqld4 graphname=graph_wheels tags=phase:graph-schema cycles===UNDEF
@@ -117,5 +118,11 @@ blocks:
.property('createdtime', {createdtime})
.as('s')
.addE('using').from('s').to('d');
fluent:
statements:
read:
type: fluent
graphname: <<graphname:graph_wheels>>
fluent: >-
g.V().hasLabel("device").has("deviceid", UUID.fromString({deviceid}))

View File

@@ -68,6 +68,9 @@ public abstract class BaseOpDispenser<T> implements OpDispenser<T> {
@Override
public void onError(long cycleValue, long resultNanos, Throwable t) {
if (!instrument) {
return;
}
errorTimer.update(resultNanos, TimeUnit.NANOSECONDS);
}

View File

@@ -0,0 +1,19 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import java.util.function.LongFunction;
public class StandardCycleOp<T> implements CycleOp<T> {
private final LongFunction<? extends T> opfunc;
public StandardCycleOp(LongFunction<? extends T> opfunc) {
this.opfunc = opfunc;
}
@Override
public T apply(long value) {
return opfunc.apply(value);
}
}

View File

@@ -0,0 +1,21 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class StandardOpDispenser<T extends Op> extends BaseOpDispenser<T>{
private final LongFunction<T> opfunc;
public StandardOpDispenser(ParsedOp op, LongFunction<T> opfunc) {
super(op);
this.opfunc = opfunc;
}
@Override
public T apply(long value) {
return opfunc.apply(value);
}
}

View File

@@ -154,6 +154,11 @@ public class ParsedOp implements LongFunction<Map<String, ?>>, StaticFieldReader
return _opTemplate.getParsed();
}
public Optional<ParsedTemplate> getAsTemplate(String fieldname) {
return this.tmap.getAsTemplate(fieldname);
}
/**
* Get the named static field value, or return the provided default, but throw an exception if
* the named field is dynamic.

View File

@@ -64,6 +64,8 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
*/
private final LinkedHashMap<String, Object> protomap = new LinkedHashMap<>();
private final List<Map<String, Object>> cfgsources;
private Map<String, Object> specmap;
private Map<String, String> bindings;
public ParsedTemplateMap(Map<String, Object> map, Map<String, String> bindings, List<Map<String, Object>> cfgsources) {
this.cfgsources = cfgsources;
@@ -75,6 +77,8 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
// fields. This seems like the saner and less confusing approach, so implementing
// op field references should be left until it is requested if at all
private void applyTemplateFields(Map<String, Object> map, Map<String, String> bindings) {
this.specmap = map;
this.bindings = bindings;
map.forEach((k, v) -> {
if (v instanceof CharSequence) {
ParsedTemplate pt = ParsedTemplate.of(((CharSequence) v).toString(), bindings);
@@ -495,6 +499,18 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
return false;
}
public Optional<ParsedTemplate> getAsTemplate(String fieldname) {
if (specmap.containsKey(fieldname)) {
Object fval = specmap.get(fieldname);
if (fval instanceof CharSequence) {
return Optional.of(new ParsedTemplate(fval.toString(),this.bindings));
} else {
throw new RuntimeException("Can not make a parsed text template from op template field '" + fieldname +"' of type '" + fval.getClass().getSimpleName() + "'");
}
}
return Optional.empty();
}
/**
* convenience method for conjugating {@link #isDefined(String)} with AND
*
@@ -677,4 +693,5 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
}
return Optional.empty();
}
}

View File

@@ -280,6 +280,9 @@ public class ParsedTemplate {
}
return sb.toString();
}
public String getPositionalStatement() {
return getPositionalStatement(s -> s);
}
/**
* Return the parsed template in (<em>literal, variable, ..., ..., literal</em>) form.