From e43bcc9a0f86aedcade4effcc7edbab5bfc5c99c Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Sat, 24 Feb 2024 12:19:48 -0600 Subject: [PATCH] customizations which need to be generalized before merge --- .../cqld4/opmappers/CqlD4BatchStmtMapper.java | 79 ++++++++++++++++++ .../adapter/cqld4/opmappers/CqlD4OpType.java | 8 ++ .../cqld4/opmappers/Cqld4CoreOpMapper.java | 1 + .../functions/to_cqlvector/LoadCqlVector.java | 80 +++++++++++++++++++ .../extensions/vectormath/CqlUtils.java | 1 - .../adapter/http/JsonElementUtils.java | 39 ++++++++- .../lifecycle/activity/ActivityExecutor.java | 4 +- .../nb/api/config/standard/ConfigModel.java | 23 ++++-- .../core/templates/StringCompositor.java | 16 ++-- .../to_string/DirectoryLinesStable.java | 12 ++- .../to_string/DirectoryLinesStableTest.java | 11 +++ 11 files changed, 250 insertions(+), 24 deletions(-) create mode 100644 adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4BatchStmtMapper.java create mode 100644 adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/to_cqlvector/LoadCqlVector.java diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4BatchStmtMapper.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4BatchStmtMapper.java new file mode 100644 index 000000000..9a5102802 --- /dev/null +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4BatchStmtMapper.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.cqld4.opmappers; + +import com.datastax.oss.driver.api.core.CqlSession; +import io.nosqlbench.adapter.cqld4.Cqld4Processors; +import io.nosqlbench.adapter.cqld4.RSProcessors; +import io.nosqlbench.adapter.cqld4.ResultSetProcessor; +import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser; +import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; +import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.OpMapper; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.engine.api.templating.TypeAndTarget; +import io.nosqlbench.nb.api.config.params.ParamsParser; +import io.nosqlbench.nb.api.errors.BasicError; +import io.nosqlbench.virtdata.core.templates.ParsedTemplateString; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.LongFunction; + +public class CqlD4BatchStmtMapper implements OpMapper { + + private final LongFunction sessionFunc; + private final TypeAndTarget target; + private final DriverAdapter adapter; + + public CqlD4BatchStmtMapper(DriverAdapter adapter, LongFunction sessionFunc, TypeAndTarget target) { + this.sessionFunc=sessionFunc; + this.target = target; + this.adapter = adapter; + } + + public OpDispenser apply(ParsedOp op) { + + ParsedOp subop = op.getAsSubOp("op_template"); + int repeat = subop.getStaticConfigOr("repeat", 1); + + ParsedTemplateString stmtTpl = op.getAsTemplate(target.field).orElseThrow(() -> new BasicError( + "No statement was found in the op template:" + op + )); + + RSProcessors processors = new RSProcessors(); + if (stmtTpl.getCaptures().size()>0) { + processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures())); + } + + Optional processorList = op.getOptionalStaticConfig("processors", List.class); + + processorList.ifPresent(l -> { + l.forEach(m -> { + Map pconfig = ParamsParser.parseToMap(m, "type"); + ResultSetProcessor processor = Cqld4Processors.resolve(pconfig); + processors.add(() -> processor); + }); + }); + + return new Cqld4PreparedStmtDispenser(adapter, sessionFunc, op, stmtTpl, processors); + + } +} diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4OpType.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4OpType.java index 828f8ea1e..b37a986cf 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4OpType.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4OpType.java @@ -43,6 +43,14 @@ public enum CqlD4OpType { */ prepared, + /** + * Allows for a statement template to be used to create a batch statement. + * The fields 'op_template', and 'repeat' are required, and all fields below + * the op_template field are a nested version of the other op types here, but + * supports only the simple and prepared forms for historic compatibility reasons. + */ + batch, + /** * uses {@link com.datastax.dse.driver.api.core.graph.ScriptGraphStatement} * This is the "raw" mode of using gremlin. It is not as efficient, and thus diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/Cqld4CoreOpMapper.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/Cqld4CoreOpMapper.java index df8c1854f..a8f87abbd 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/Cqld4CoreOpMapper.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/Cqld4CoreOpMapper.java @@ -72,6 +72,7 @@ public class Cqld4CoreOpMapper implements OpMapper { case raw -> new CqlD4RawStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op); case simple -> new CqlD4CqlSimpleStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op); case prepared -> new CqlD4PreparedStmtMapper(adapter, sessionFunc, target).apply(op); + case batch -> new CqlD4BatchStmtMapper(adapter, sessionFunc, target).apply(op); case gremlin -> new Cqld4GremlinOpMapper(adapter, sessionFunc, target.targetFunction).apply(op); case fluent -> new Cqld4FluentGraphOpMapper(adapter, sessionFunc, target).apply(op); case rainbow -> new CqlD4RainbowTableMapper(adapter, sessionFunc, target.targetFunction).apply(op); diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/to_cqlvector/LoadCqlVector.java b/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/to_cqlvector/LoadCqlVector.java new file mode 100644 index 000000000..ead1868e6 --- /dev/null +++ b/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/to_cqlvector/LoadCqlVector.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.datamappers.functions.to_cqlvector; + +import com.datastax.oss.driver.api.core.data.CqlVector; +import io.nosqlbench.virtdata.api.annotations.Categories; +import io.nosqlbench.virtdata.api.annotations.Category; +import io.nosqlbench.virtdata.api.annotations.Example; +import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper; +import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState; + +import java.util.HashMap; +import java.util.function.Function; + +@Categories(Category.state) +@ThreadSafeMapper +public class LoadCqlVector implements Function { + + private final String name; + private final Function nameFunc; + private final com.datastax.oss.driver.api.core.data.CqlVector defaultValue; + + @Example({"LoadDouble('foo')","for the current thread, load a double value from the named variable."}) + public LoadCqlVector(String name) { + this.name = name; + this.nameFunc=null; + this.defaultValue=com.datastax.oss.driver.api.core.data.CqlVector.newInstance(0.0f); + } + + @Example({"LoadDouble('foo',23D)","for the current thread, load a double value from the named variable," + + "or the default value if the named variable is not defined."}) + public LoadCqlVector(String name, int len) { + this.name = name; + this.nameFunc=null; + Double[] ary = new Double[len]; + for (int i = 0; i < len; i++) { + ary[i]=(double)i; + } + this.defaultValue=com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary); + } + + @Override + public com.datastax.oss.driver.api.core.data.CqlVector apply(Object o) { + HashMap map = SharedState.tl_ObjectMap.get(); + String varname=(nameFunc!=null) ? String.valueOf(nameFunc.apply(o)) : name; + Object value = map.getOrDefault(varname, defaultValue); + if (value instanceof CqlVector cqlvector) { + return cqlvector; + } else if (value instanceof float[] fa) { + Float[] ary = new Float[fa.length]; + for (int i = 0; i < fa.length; i++) { + ary[i]=fa[i]; + } + return com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary); + } else if (value instanceof double[] da) { + Double[] ary = new Double[da.length]; + for (int i = 0; i < da.length; i++) { + ary[i]=da[i]; + } + return com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary); + } else { + return (com.datastax.oss.driver.api.core.data.CqlVector) value; + } + + } +} diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/engine/extensions/vectormath/CqlUtils.java b/adapter-cqld4/src/main/java/io/nosqlbench/engine/extensions/vectormath/CqlUtils.java index ddcc692b4..be26e682c 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/engine/extensions/vectormath/CqlUtils.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/engine/extensions/vectormath/CqlUtils.java @@ -45,5 +45,4 @@ public class CqlUtils extends NBBaseComponent { return rows.stream().mapToInt(r -> Integer.parseInt(Objects.requireNonNull(r.getString(fieldName)))).toArray(); } - } diff --git a/adapter-http/src/main/java/io/nosqlbench/adapter/http/JsonElementUtils.java b/adapter-http/src/main/java/io/nosqlbench/adapter/http/JsonElementUtils.java index d4423dc7e..19917e11f 100644 --- a/adapter-http/src/main/java/io/nosqlbench/adapter/http/JsonElementUtils.java +++ b/adapter-http/src/main/java/io/nosqlbench/adapter/http/JsonElementUtils.java @@ -2,13 +2,13 @@ package io.nosqlbench.adapter.http; /* * Copyright (c) 2022 nosqlbench - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import java.sql.Array; import java.util.ArrayList; import java.util.List; @@ -61,6 +62,36 @@ public class JsonElementUtils { i++; } return keys; - } + + public static List customNumberArrayToFloatList(JsonElement element) { + JsonObject o1 = element.getAsJsonObject(); + JsonElement data = o1.get("data"); + JsonArray dary = data.getAsJsonArray(); + JsonElement element0 = dary.get(0); + JsonObject eobj1 = element0.getAsJsonObject(); + JsonElement embedding = eobj1.get("embedding"); + JsonArray ary = embedding.getAsJsonArray(); + ArrayList list = new ArrayList<>(ary.size()); + for (JsonElement jsonElement : ary) { + list.add(jsonElement.getAsFloat()); + } + return list; + } + + public static float[] customNumberArrayToFloatArray(JsonElement element) { + JsonObject o1 = element.getAsJsonObject(); + JsonElement data = o1.get("data"); + JsonArray dary = data.getAsJsonArray(); + JsonElement element0 = dary.get(0); + JsonObject eobj1 = element0.getAsJsonObject(); + JsonElement embedding = eobj1.get("embedding"); + JsonArray ary = embedding.getAsJsonArray(); + float[] floats = new float[ary.size()]; + for (int i = 0; i < floats.length; i++) { + floats[i]=ary.get(i).getAsFloat(); + } + return floats; + } + } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java index 7ad32521b..dd7725617 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java @@ -512,7 +512,9 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener RunState maxState = state.getMaxState(); activity.setRunState(maxState); if (maxState == RunState.Errored) { - throw new RuntimeException("Error while waiting for activity completion" + (this.exception!=null ? this.exception.toString() : "")); + throw new RuntimeException("Error while waiting for activity completion with states [" + tally.toString() + "], error=" + (this.exception!=null ? + this.exception.toString() : "[no error on activity executor]")); + } } diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/ConfigModel.java b/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/ConfigModel.java index 7da373dce..27fbef95e 100644 --- a/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/ConfigModel.java +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/ConfigModel.java @@ -236,7 +236,7 @@ public class ConfigModel implements NBConfigModel { private ConfigModel expand(ConfigModel configModel, Map config) { List> expanders = configModel.params.stream() - .filter(p -> p.getExpander()!=null).toList(); + .filter(p -> p.getExpander() != null).toList(); for (Param expandingParameter : expanders) { for (String name : expandingParameter.getNames()) { @@ -294,16 +294,23 @@ public class ConfigModel implements NBConfigModel { // For each provided configuration element ... for (String configkey : config.keySet()) { Param element = this.paramsByName.get(configkey); + String warning = "Unknown config parameter '" + configkey + "' in config model while configuring " + getOf().getSimpleName() + + ", possible parameter names are " + this.paramsByName.keySet() + "."; if (element == null) { - StringBuilder paramhelp = new StringBuilder( - "Unknown config parameter '" + configkey + "' in config model while configuring " + getOf().getSimpleName() - + ", possible parameter names are " + this.paramsByName.keySet() + "." - ); + String warnonly = System.getenv("NB_CONFIG_WARNINGS_ONLY"); + if (warnonly != null) { + System.out.println("WARNING: " + warning); + } else { + StringBuilder paramhelp = new StringBuilder( + "Unknown config parameter '" + configkey + "' in config model while configuring " + getOf().getSimpleName() + + ", possible parameter names are " + this.paramsByName.keySet() + "." + ); - ConfigSuggestions.getForParam(this, configkey) - .ifPresent(suggestion -> paramhelp.append(" ").append(suggestion)); + ConfigSuggestions.getForParam(this, configkey) + .ifPresent(suggestion -> paramhelp.append(" ").append(suggestion)); - throw new BasicError(paramhelp.toString()); + throw new BasicError(paramhelp.toString()); + } } Object value = config.get(configkey); } diff --git a/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/StringCompositor.java b/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/StringCompositor.java index e2f35f285..46850ed72 100644 --- a/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/StringCompositor.java +++ b/virtdata-api/src/main/java/io/nosqlbench/virtdata/core/templates/StringCompositor.java @@ -61,12 +61,12 @@ public class StringCompositor implements LongFunction { spans[spans.length - 1] = even_odd_spans[even_odd_spans.length - 1]; this.stringfunc = stringfunc; - int minsize = 0; - for (int i = 0; i < 100; i++) { - String result = apply(i); - minsize = Math.max(minsize, result.length()); - } - bufsize = minsize * 2; +// int minsize = 0; +// for (int i = 0; i < 100; i++) { +// String result = apply(i); +// minsize = Math.max(minsize,result.length()); +// } + bufsize = spans.length*1024; } public StringCompositor(ParsedTemplateString template, Map fconfig) { @@ -78,7 +78,9 @@ public class StringCompositor implements LongFunction { StringBuilder sb = new StringBuilder(bufsize); String[] ary = new String[mappers.length]; for (int i = 0; i < ary.length; i++) { - ary[i] = stringfunc.apply(mappers[i].apply(value)); + DataMapper mapperType = mappers[i]; + Object object = mapperType.apply(value); + ary[i] = stringfunc.apply(object); } for (int i = 0; i < LUT.length; i++) { sb.append(spans[i]).append(ary[LUT[i]]); diff --git a/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java index a43e2a457..53b511a74 100644 --- a/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java +++ b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java @@ -91,6 +91,14 @@ public class DirectoryLinesStable implements LongFunction { this.totalSize = accumulator; } + private String getLine(int index, int offset) { + try { + IntFunction func = fileFunctions.get(index); + return func.apply(offset); + } catch (Exception e) { + throw new RuntimeException("Error while binding index=" + index + " offset=" + offset + " for " + this); + } + } @Override public synchronized String apply(long cycle) { int value = (int) (cycle % totalSize); @@ -100,9 +108,7 @@ public class DirectoryLinesStable implements LongFunction { index++; } - IntFunction func = fileFunctions.get(index); - return func.apply(value); - + return this.getLine(index,value); } @Override diff --git a/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java index aebf242c0..c5aa2b1a9 100644 --- a/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java +++ b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java @@ -38,5 +38,16 @@ public class DirectoryLinesStableTest { assertThat(directoryLines.apply(Long.MAX_VALUE)).isEqualTo("data2.txt-line3"); } + @Test + public void testOverRangeIssue() { + DirectoryLinesStable directoryLines = new DirectoryLinesStable( + "../local/testdirlines", ".+jsonl" + ); + for (long i = 0; i < 40000; i++) { + String result = directoryLines.apply(i); + } + + } + }