From f2f6649c9396dfdf6e38d1078fb3830399441816 Mon Sep 17 00:00:00 2001
From: Jonathan Shook
Date: Fri, 15 Mar 2024 13:04:32 -0500
Subject: [PATCH] partial variable capture work
---
.../adapter/cqld4/optypes/Cqld4CqlOp.java | 9 +-
.../cqld4/optypes/Cqld4RainbowTableOp.java | 11 +-
.../uniform/flowtypes/VariableCapture.java | 26 +++-
.../api/activityimpl/SimpleActivity.java | 17 +++
.../activityimpl/varcap/VarCapCycleOp.java | 43 ++++++
.../varcap/VarCapOpDispenserWrapper.java | 56 ++++++++
.../virtdata/core/templates/CapturePoint.java | 131 +++++++++++++++---
.../core/templates/CapturePointParser.java | 68 ++++++---
.../core/templates/ParsedTemplateString.java | 2 +-
.../templates/CapturePointParserTest.java | 22 ++-
.../core/templates/CapturePointTest.java | 9 +-
.../templates/ParsedTemplateStringTest.java | 5 +-
12 files changed, 336 insertions(+), 63 deletions(-)
create mode 100644 engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapCycleOp.java
create mode 100644 engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapOpDispenserWrapper.java
diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java
index 689c6b6fc..3270c99f8 100644
--- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java
+++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java
@@ -29,6 +29,7 @@ import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException;
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
+import io.nosqlbench.virtdata.core.templates.CapturePoint;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -49,7 +50,7 @@ import java.util.concurrent.*;
// TODO: add rows histogram resultSetSizeHisto
-public abstract class Cqld4CqlOp implements CycleOp>, VariableCapture, OpGenerator, OpResultSize {
+public abstract class Cqld4CqlOp implements CycleOp>, VariableCapture>, OpGenerator, OpResultSize {
private final static Logger logger = LogManager.getLogger(Cqld4CqlOp.class);
private final CqlSession session;
@@ -165,8 +166,10 @@ public abstract class Cqld4CqlOp implements CycleOp>, VariableCapture,
return next;
}
- public Map capture() {
- throw new NotImplementedException("Not implemented for Cqld4CqlOp");
+
+ @Override
+ public List> capture(List input, List capturePoints) {
+
}
public abstract Statement> getStmt();
diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4RainbowTableOp.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4RainbowTableOp.java
index 92fad8ed1..1b79b3f65 100644
--- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4RainbowTableOp.java
+++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4RainbowTableOp.java
@@ -22,7 +22,7 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
import java.util.Map;
// Need to create RainbowTableStatement
-public class Cqld4RainbowTableOp implements CycleOp, VariableCapture, OpGenerator, OpResultSize {
+public class Cqld4RainbowTableOp implements CycleOp, OpGenerator, OpResultSize {
// private final CqlSession session;
// private final RainbowTableStatement stmt;
@@ -38,13 +38,4 @@ public class Cqld4RainbowTableOp implements CycleOp, VariableCapture,
throw new RuntimeException("implement me");
}
- @Override
- public Map capture() {
- throw new RuntimeException("implement me");
- }
-//
-// public Cqld4RainbowTableOp(CqlSession session, RainbowTableStatement stmt, int maxpages, boolean retryreplace) {
-// //
-// }
-
}
diff --git a/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/flowtypes/VariableCapture.java b/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/flowtypes/VariableCapture.java
index 19c3043d2..7ad806e52 100644
--- a/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/flowtypes/VariableCapture.java
+++ b/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/flowtypes/VariableCapture.java
@@ -16,9 +16,11 @@
package io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes;
+import io.nosqlbench.virtdata.core.templates.CapturePoint;
import io.nosqlbench.virtdata.core.templates.ParsedTemplateString;
+import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
-import java.util.Map;
+import java.util.List;
/**
* If an op implements VariableCapture, then it is known to be able to
@@ -29,6 +31,24 @@ import java.util.Map;
* and to allow for auto documentation tha the feature is supported for
* a given adapter.
*/
-public interface VariableCapture {
- Map capture();
+public interface VariableCapture {
+ List> capture(I input, List capturePoints);
+
+ default void applyCaptures(List capturePoints, List> values) {
+ for (int i = 0; i < capturePoints.size(); i++) {
+ CapturePoint cp = capturePoints.get(i);
+ String storeAs = cp.getStoredName();
+ CapturePoint.Scope storeScope = cp.getStoredScope();
+ Class> storedType = cp.getStoredType();
+
+
+ switch (storeScope) {
+ case stanza, container, thread ->
+ SharedState.tl_ObjectMap.get().put(storeAs, storedType.cast(values.get(i)));
+ case session ->
+ SharedState.gl_ObjectMap.put(storeAs, storedType.cast(values.get(i)));
+ }
+ }
+
+ }
}
diff --git a/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java b/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
index 2c02f9126..fe7458a7a 100644
--- a/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
+++ b/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
@@ -56,6 +56,7 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.DryRunOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
+import io.nosqlbench.virtdata.core.templates.CapturePoint;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -451,6 +452,22 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
OpMapper extends Op> opMapper = adapter.getOpMapper();
OpDispenser extends Op> dispenser = opMapper.apply(pop);
+ List captures = pop.getCaptures();
+ if (!captures.isEmpty()) {
+ logger.debug("Creating test op for variable capture configuration (for " + pop.getName() + ")");
+ Op op = dispenser.apply(0);
+ if (op instanceof VariableCapture cap) {
+ if (op instanceof CycleOp> cycleOp) {
+ dispenser = new VarCapOpDispenserWrapper((DriverAdapter) adapter, pop,
+ dispenser);
+ } else {
+ throw new RuntimeException("Unable to wrap op dispenser for variable capture because of " +
+ "the core op implementation does not return a result. Use CycleOp for this.");
+ }
+
+ }
+ }
+
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
if ("op".equalsIgnoreCase(dryrunSpec)) {
dispenser = new DryRunOpDispenserWrapper((DriverAdapter)adapter, pop, dispenser);
diff --git a/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapCycleOp.java b/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapCycleOp.java
new file mode 100644
index 000000000..f2e989528
--- /dev/null
+++ b/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapCycleOp.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2024 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.engine.api.activityimpl.varcap;
+
+import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
+import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.VariableCapture;
+import io.nosqlbench.virtdata.core.templates.CapturePoint;
+
+import java.util.List;
+
+public class VarCapCycleOp implements CycleOp {
+ private final CycleOp realOp;
+ private final List capturePointList;
+
+ public VarCapCycleOp(CycleOp op, List varcaps) {
+ this.realOp = op;
+ this.capturePointList = varcaps;
+ }
+
+
+ @Override
+ public T apply(long value) {
+ T result = realOp.apply(value);
+ VariableCapture capturer = ((VariableCapture) this);
+ List> captured = capturer.capture(result, capturePointList);
+ capturer.applyCaptures(capturePointList,captured);
+ return result;
+ }
+}
diff --git a/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapOpDispenserWrapper.java b/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapOpDispenserWrapper.java
new file mode 100644
index 000000000..d6268d1fe
--- /dev/null
+++ b/engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/varcap/VarCapOpDispenserWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2024 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.engine.api.activityimpl.varcap;
+
+import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
+import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
+import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
+import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
+import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.VariableCapture;
+import io.nosqlbench.adapters.api.templating.ParsedOp;
+import io.nosqlbench.virtdata.core.templates.CapturePoint;
+
+import java.util.List;
+import java.util.function.LongFunction;
+
+public class VarCapOpDispenserWrapper extends BaseOpDispenser {
+ private final OpDispenser extends Op> realDispenser;
+ private final LongFunction> opFunc;
+ private final List capturePoints;
+
+ public VarCapOpDispenserWrapper(DriverAdapter adapter, ParsedOp pop, OpDispenser extends Op> dispenser) {
+ super(adapter, pop);
+ this.realDispenser = dispenser;
+ this.capturePoints = pop.getCaptures();
+ Op exampleOp = realDispenser.apply(0L);
+
+
+ if (exampleOp instanceof CycleOp> cop) {
+ opFunc = l -> new VarCapCycleOp((CycleOp>)realDispenser.apply(l), capturePoints);
+ } else {
+ throw new RuntimeException("Invalid type for varcap:" + exampleOp.getClass().getCanonicalName());
+ }
+
+
+ }
+
+ @Override
+ public CycleOp> apply(long value) {
+ return opFunc.apply(value);
+ }
+}
diff --git a/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePoint.java b/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePoint.java
index dd826869f..70f62b822 100644
--- a/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePoint.java
+++ b/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePoint.java
@@ -19,28 +19,39 @@ package io.nosqlbench.virtdata.core.templates;
import java.util.Objects;
/**
- * A capture point is a named variable which should be extracted from a payload or result type
+ * A capture point is a named variable which should be extracted from a payload or result type
* using a native driver API. The result is meant to be provided to the NoSQLBench runtime
* during cycle execution, and stored in a scoped context of variables which can be re-used within
* other operations.
+ *
+ *
*
+ *
* Format
*
* {@code
* select [username as u1] from users where userid={userid};
* }
- *
+ *
* In the example above, the span [username as u1] is recognized as a capture point.
* The name of the variable to be captured is username. It is to be captured under
* a different variable name u1.
+ *
*
+ *
* If the name is the same in both cases, i.e. the variable is named in the result as it
* should be known after extraction, then you can elide the as u1 clause as in this example:
*
*
{@code
* select [username] from users where userid={userid};
* }
+ *
*
+ *
+ * The scope of the captured
+ *
+ *
+ *
* During op mapping, any capture points are condensed down to the native driver vernacular by
* removing the square brackets from the op template. Thus, the result of parsing the above would
* yield a form compatible with a native driver. For example, converting to prepared statement form
@@ -49,62 +60,142 @@ import java.util.Objects;
*
{@code
* select username from users where userid=:userid
* }
- *
+ *
* For details on the {userid} form, see {@link BindPoint}
*/
public class CapturePoint {
- private final String name;
- private final String asName;
+ /**
+ * The name of the capture point as known by the original protocol or driver API
+ */
+ private final String capturedName;
- public CapturePoint(String name, String asName) {
- this.name = name;
- this.asName = asName;
+ /**
+ * The name that the captured value should be stored as
+ */
+ private final String storedName;
+
+ private final Scope storedScope;
+
+ private final Class> storedType;
+ private final Class> elementType;
+
+ protected CapturePoint(
+ String capturedName,
+ String storedName,
+ Scope storedScope,
+ Class> storedType,
+ Class> elementType
+ ) {
+ this.capturedName = capturedName;
+ this.storedName = storedName;
+ this.storedScope = storedScope;
+ this.storedType = storedType;
+ this.elementType = elementType;
}
- public String getName() {
- return name;
+ public String getCapturedName() {
+ return capturedName;
}
+ public String getStoredName() {
+ return storedName;
+ }
+
+ public Scope getStoredScope() {
+ return this.storedScope;
+ }
+
+
/**
* Create a CapturePoint with the specified anchorName, and an optional aliasName.
* If aliasName is null, then the anchorName is used as the alias.
*
- * @param anchorName The name of the capture variable in the native form
- * @param aliasName The name of the captured value as seen by consumers
+ * @param capturedName
+ * The name of the capture variable in the native form
+ * @param storedName
+ * The name of the captured value as seen by consumers
* @return A new CapturePoint
*/
- public static CapturePoint of(String anchorName, String aliasName) {
- Objects.requireNonNull(anchorName);
- return new CapturePoint(anchorName, aliasName == null ? anchorName : aliasName);
+ public static CapturePoint of(String capturedName, String storedName, Scope scope, Class> storedType,
+ Class> elementType) {
+ Objects.requireNonNull(capturedName);
+ return new CapturePoint(capturedName, storedName == null ? capturedName : storedName, scope, storedType,
+ elementType);
}
/**
* Create a CapturePoint with the specified anchorName, and the same aliasName.
*
- * @param anchorName The name of the capture variable in the native form and as seen by consumers.
+ * @param anchorName
+ * The name of the capture variable in the native form and as seen by consumers.
* @return A new CapturePoint
*/
public static CapturePoint of(String anchorName) {
Objects.requireNonNull(anchorName);
- return new CapturePoint(anchorName, anchorName);
+ return new CapturePoint(anchorName, anchorName, Scope.stanza, Object.class, null);
}
@Override
public String toString() {
- return "[" + getName() + (name.equals(asName) ? "" : " as " + asName) + "]";
+ StringBuilder sb = new StringBuilder();
+ sb.append("[").append(capturedName);
+ if (!capturedName.equals(storedName)) {
+ sb.append(" as:").append(storedName);
+ }
+ sb.append(" scope:").append(storedScope.name());
+ if (storedType != Object.class) {
+ sb.append(" type:").append(storedType.getCanonicalName());
+ }
+ sb.append("]");
+
+ return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
+
CapturePoint that = (CapturePoint) o;
- return Objects.equals(name, that.name) && Objects.equals(asName, that.asName);
+
+ if (!capturedName.equals(that.capturedName)) return false;
+ if (!storedName.equals(that.storedName)) return false;
+ if (storedScope != that.storedScope) return false;
+ return storedType.equals(that.storedType);
}
@Override
public int hashCode() {
- return Objects.hash(name, asName);
+ int result = capturedName.hashCode();
+ result = 31 * result + storedName.hashCode();
+ result = 31 * result + storedScope.hashCode();
+ result = 31 * result + storedType.hashCode();
+ return result;
+ }
+
+ public Class> getStoredType() {
+ return this.storedType;
+ }
+
+ public static enum Scope {
+ /**
+ * The stanza scope includes the op sequence, but not the next iteration of an op sequence
+ */
+ stanza,
+ /**
+ * Thread scope is equivalent to thread-local, although it may be implemented using a different mechanism with
+ * virtual threads
+ */
+ thread,
+ /**
+ * Container scope is limited to the container within which an op is executed
+ */
+ container,
+
+ /**
+ * Session scope includes all containers, threads, and stanzas within a session
+ */
+ session
}
}
diff --git a/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePointParser.java b/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePointParser.java
index e51fdf2f7..7895cf53b 100644
--- a/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePointParser.java
+++ b/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/CapturePointParser.java
@@ -23,34 +23,70 @@ import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class CapturePointParser implements Function {
+public class CapturePointParser implements Function {
- public final static Pattern CAPTUREPOINT_PATTERN = Pattern.compile(
- "(\\[(?\\w+[-_\\d\\w.]*)(\\s+[aA][sS]\\s+(?\\w+[-_\\d\\w.]*))?])"
+ public final static Pattern CAPTURE_PARAM_PATTERN = Pattern.compile(
+ "(\\s*(?as|scope|type)\\s*:\\s*(?[-_.a-zA-Z0-9<>]+))"
);
+ public final static Pattern CAPTURE_POINT_PATTERN = Pattern.compile(
+ "\\[\\s*(?[a-z][_a-zA-Z0-9]*)(?" + CAPTURE_PARAM_PATTERN.pattern() + "+)*]"
+ );
+
@Override
- public Result apply(String template) {
+ public ParsedCapturePoint apply(String template) {
StringBuilder raw = new StringBuilder();
- Matcher m = CAPTUREPOINT_PATTERN.matcher(template);
- List captures = new ArrayList<>();
+ Matcher m = CAPTURE_POINT_PATTERN.matcher(template);
+ List captures = new ArrayList<>();
while (m.find()) {
- CapturePoint captured = CapturePoint.of(m.group("capture"), m.group("alias"));
+ String captureName = m.group("name");
+ String captureParams = m.group("params");
+ String storedName = captureName;
+ Class> storedClass = Object.class;
+ Class> elementType = null;
+ io.nosqlbench.virtdata.core.templates.CapturePoint.Scope storedScope = io.nosqlbench.virtdata.core.templates.CapturePoint.Scope.stanza;
+
+ if (captureParams != null) {
+ Matcher pfinder = CAPTURE_PARAM_PATTERN.matcher(captureParams);
+ while (pfinder.find()) {
+ String param = pfinder.group("param");
+ String value = pfinder.group("value");
+ switch (param) {
+ case "as":
+ storedName = value;
+ break;
+ case "scope":
+ storedScope = io.nosqlbench.virtdata.core.templates.CapturePoint.Scope.valueOf(value);
+ break;
+ case "type":
+ try {
+ storedClass = Class.forName(value);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ }
+ }
+ }
+ io.nosqlbench.virtdata.core.templates.CapturePoint captured =
+ io.nosqlbench.virtdata.core.templates.CapturePoint.of(captureName, storedName, storedScope, storedClass,
+ null);
captures.add(captured);
- m.appendReplacement(raw,captured.getName());
+ m.appendReplacement(raw, captured.getCapturedName());
+
+
}
m.appendTail(raw);
- return new Result(raw.toString(),captures);
+ return new ParsedCapturePoint(raw.toString(), captures);
}
- public final static class Result {
+ public final static class ParsedCapturePoint {
private final String rawTemplate;
- private final List captures;
-
- public Result(String rawTemplate, List captures) {
+ private final List captures;
+ public ParsedCapturePoint(String rawTemplate, List captures) {
this.rawTemplate = rawTemplate;
this.captures = captures;
}
@@ -59,7 +95,7 @@ public class CapturePointParser implements Function getCaptures() {
+ public List getCaptures() {
return this.captures;
}
@@ -67,8 +103,8 @@ public class CapturePointParser implements Function