mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
customizations which need to be generalized before merge
This commit is contained in:
parent
74aac34145
commit
f1f6b3336c
@ -0,0 +1,79 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022-2023 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||||
|
|
||||||
|
import com.datastax.oss.driver.api.core.CqlSession;
|
||||||
|
import io.nosqlbench.adapter.cqld4.Cqld4Processors;
|
||||||
|
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||||
|
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
|
||||||
|
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser;
|
||||||
|
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||||
|
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
|
||||||
|
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||||
|
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||||
|
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||||
|
import io.nosqlbench.nb.api.config.params.ParamsParser;
|
||||||
|
import io.nosqlbench.nb.api.errors.BasicError;
|
||||||
|
import io.nosqlbench.virtdata.core.templates.ParsedTemplateString;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
public class CqlD4BatchStmtMapper implements OpMapper<Cqld4CqlOp> {
|
||||||
|
|
||||||
|
private final LongFunction<CqlSession> sessionFunc;
|
||||||
|
private final TypeAndTarget<CqlD4OpType, String> target;
|
||||||
|
private final DriverAdapter adapter;
|
||||||
|
|
||||||
|
public CqlD4BatchStmtMapper(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType,String> target) {
|
||||||
|
this.sessionFunc=sessionFunc;
|
||||||
|
this.target = target;
|
||||||
|
this.adapter = adapter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OpDispenser<Cqld4CqlOp> apply(ParsedOp op) {
|
||||||
|
|
||||||
|
ParsedOp subop = op.getAsSubOp("op_template");
|
||||||
|
int repeat = subop.getStaticConfigOr("repeat", 1);
|
||||||
|
|
||||||
|
ParsedTemplateString stmtTpl = op.getAsTemplate(target.field).orElseThrow(() -> new BasicError(
|
||||||
|
"No statement was found in the op template:" + op
|
||||||
|
));
|
||||||
|
|
||||||
|
RSProcessors processors = new RSProcessors();
|
||||||
|
if (stmtTpl.getCaptures().size()>0) {
|
||||||
|
processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Optional<List> processorList = op.getOptionalStaticConfig("processors", List.class);
|
||||||
|
|
||||||
|
processorList.ifPresent(l -> {
|
||||||
|
l.forEach(m -> {
|
||||||
|
Map<String, String> pconfig = ParamsParser.parseToMap(m, "type");
|
||||||
|
ResultSetProcessor processor = Cqld4Processors.resolve(pconfig);
|
||||||
|
processors.add(() -> processor);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return new Cqld4PreparedStmtDispenser(adapter, sessionFunc, op, stmtTpl, processors);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -43,6 +43,14 @@ public enum CqlD4OpType {
|
|||||||
*/
|
*/
|
||||||
prepared,
|
prepared,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows for a statement template to be used to create a batch statement.
|
||||||
|
* The fields 'op_template', and 'repeat' are required, and all fields below
|
||||||
|
* the op_template field are a nested version of the other op types here, but
|
||||||
|
* supports only the simple and prepared forms for historic compatibility reasons.
|
||||||
|
*/
|
||||||
|
batch,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* uses {@link com.datastax.dse.driver.api.core.graph.ScriptGraphStatement}
|
* uses {@link com.datastax.dse.driver.api.core.graph.ScriptGraphStatement}
|
||||||
* This is the "raw" mode of using gremlin. It is not as efficient, and thus
|
* This is the "raw" mode of using gremlin. It is not as efficient, and thus
|
||||||
|
@ -72,6 +72,7 @@ public class Cqld4CoreOpMapper implements OpMapper<Op> {
|
|||||||
case raw -> new CqlD4RawStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
case raw -> new CqlD4RawStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||||
case simple -> new CqlD4CqlSimpleStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
case simple -> new CqlD4CqlSimpleStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||||
case prepared -> new CqlD4PreparedStmtMapper(adapter, sessionFunc, target).apply(op);
|
case prepared -> new CqlD4PreparedStmtMapper(adapter, sessionFunc, target).apply(op);
|
||||||
|
case batch -> new CqlD4BatchStmtMapper(adapter, sessionFunc, target).apply(op);
|
||||||
case gremlin -> new Cqld4GremlinOpMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
case gremlin -> new Cqld4GremlinOpMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||||
case fluent -> new Cqld4FluentGraphOpMapper(adapter, sessionFunc, target).apply(op);
|
case fluent -> new Cqld4FluentGraphOpMapper(adapter, sessionFunc, target).apply(op);
|
||||||
case rainbow -> new CqlD4RainbowTableMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
case rainbow -> new CqlD4RainbowTableMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||||
|
@ -0,0 +1,80 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.datamappers.functions.to_cqlvector;
|
||||||
|
|
||||||
|
import com.datastax.oss.driver.api.core.data.CqlVector;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.Categories;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.Category;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.Example;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
|
||||||
|
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@Categories(Category.state)
|
||||||
|
@ThreadSafeMapper
|
||||||
|
public class LoadCqlVector implements Function<Object,com.datastax.oss.driver.api.core.data.CqlVector> {
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final Function<Object,Object> nameFunc;
|
||||||
|
private final com.datastax.oss.driver.api.core.data.CqlVector defaultValue;
|
||||||
|
|
||||||
|
@Example({"LoadDouble('foo')","for the current thread, load a double value from the named variable."})
|
||||||
|
public LoadCqlVector(String name) {
|
||||||
|
this.name = name;
|
||||||
|
this.nameFunc=null;
|
||||||
|
this.defaultValue=com.datastax.oss.driver.api.core.data.CqlVector.newInstance(0.0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Example({"LoadDouble('foo',23D)","for the current thread, load a double value from the named variable," +
|
||||||
|
"or the default value if the named variable is not defined."})
|
||||||
|
public LoadCqlVector(String name, int len) {
|
||||||
|
this.name = name;
|
||||||
|
this.nameFunc=null;
|
||||||
|
Double[] ary = new Double[len];
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
ary[i]=(double)i;
|
||||||
|
}
|
||||||
|
this.defaultValue=com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public com.datastax.oss.driver.api.core.data.CqlVector apply(Object o) {
|
||||||
|
HashMap<String, Object> map = SharedState.tl_ObjectMap.get();
|
||||||
|
String varname=(nameFunc!=null) ? String.valueOf(nameFunc.apply(o)) : name;
|
||||||
|
Object value = map.getOrDefault(varname, defaultValue);
|
||||||
|
if (value instanceof CqlVector<?> cqlvector) {
|
||||||
|
return cqlvector;
|
||||||
|
} else if (value instanceof float[] fa) {
|
||||||
|
Float[] ary = new Float[fa.length];
|
||||||
|
for (int i = 0; i < fa.length; i++) {
|
||||||
|
ary[i]=fa[i];
|
||||||
|
}
|
||||||
|
return com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary);
|
||||||
|
} else if (value instanceof double[] da) {
|
||||||
|
Double[] ary = new Double[da.length];
|
||||||
|
for (int i = 0; i < da.length; i++) {
|
||||||
|
ary[i]=da[i];
|
||||||
|
}
|
||||||
|
return com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary);
|
||||||
|
} else {
|
||||||
|
return (com.datastax.oss.driver.api.core.data.CqlVector) value;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -45,5 +45,4 @@ public class CqlUtils extends NBBaseComponent {
|
|||||||
return rows.stream().mapToInt(r -> Integer.parseInt(Objects.requireNonNull(r.getString(fieldName)))).toArray();
|
return rows.stream().mapToInt(r -> Integer.parseInt(Objects.requireNonNull(r.getString(fieldName)))).toArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import com.google.gson.JsonArray;
|
|||||||
import com.google.gson.JsonElement;
|
import com.google.gson.JsonElement;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
|
|
||||||
|
import java.sql.Array;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -61,6 +62,36 @@ public class JsonElementUtils {
|
|||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
return keys;
|
return keys;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<Float> customNumberArrayToFloatList(JsonElement element) {
|
||||||
|
JsonObject o1 = element.getAsJsonObject();
|
||||||
|
JsonElement data = o1.get("data");
|
||||||
|
JsonArray dary = data.getAsJsonArray();
|
||||||
|
JsonElement element0 = dary.get(0);
|
||||||
|
JsonObject eobj1 = element0.getAsJsonObject();
|
||||||
|
JsonElement embedding = eobj1.get("embedding");
|
||||||
|
JsonArray ary = embedding.getAsJsonArray();
|
||||||
|
ArrayList<Float> list = new ArrayList<>(ary.size());
|
||||||
|
for (JsonElement jsonElement : ary) {
|
||||||
|
list.add(jsonElement.getAsFloat());
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static float[] customNumberArrayToFloatArray(JsonElement element) {
|
||||||
|
JsonObject o1 = element.getAsJsonObject();
|
||||||
|
JsonElement data = o1.get("data");
|
||||||
|
JsonArray dary = data.getAsJsonArray();
|
||||||
|
JsonElement element0 = dary.get(0);
|
||||||
|
JsonObject eobj1 = element0.getAsJsonObject();
|
||||||
|
JsonElement embedding = eobj1.get("embedding");
|
||||||
|
JsonArray ary = embedding.getAsJsonArray();
|
||||||
|
float[] floats = new float[ary.size()];
|
||||||
|
for (int i = 0; i < floats.length; i++) {
|
||||||
|
floats[i]=ary.get(i).getAsFloat();
|
||||||
|
}
|
||||||
|
return floats;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@ -512,7 +512,9 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
|
|||||||
RunState maxState = state.getMaxState();
|
RunState maxState = state.getMaxState();
|
||||||
activity.setRunState(maxState);
|
activity.setRunState(maxState);
|
||||||
if (maxState == RunState.Errored) {
|
if (maxState == RunState.Errored) {
|
||||||
throw new RuntimeException("Error while waiting for activity completion" + (this.exception!=null ? this.exception.toString() : ""));
|
throw new RuntimeException("Error while waiting for activity completion with states [" + tally.toString() + "], error=" + (this.exception!=null ?
|
||||||
|
this.exception.toString() : "[no error on activity executor]"));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,7 +294,13 @@ public class ConfigModel implements NBConfigModel {
|
|||||||
// For each provided configuration element ...
|
// For each provided configuration element ...
|
||||||
for (String configkey : config.keySet()) {
|
for (String configkey : config.keySet()) {
|
||||||
Param<?> element = this.paramsByName.get(configkey);
|
Param<?> element = this.paramsByName.get(configkey);
|
||||||
|
String warning = "Unknown config parameter '" + configkey + "' in config model while configuring " + getOf().getSimpleName()
|
||||||
|
+ ", possible parameter names are " + this.paramsByName.keySet() + ".";
|
||||||
if (element == null) {
|
if (element == null) {
|
||||||
|
String warnonly = System.getenv("NB_CONFIG_WARNINGS_ONLY");
|
||||||
|
if (warnonly != null) {
|
||||||
|
System.out.println("WARNING: " + warning);
|
||||||
|
} else {
|
||||||
StringBuilder paramhelp = new StringBuilder(
|
StringBuilder paramhelp = new StringBuilder(
|
||||||
"Unknown config parameter '" + configkey + "' in config model while configuring " + getOf().getSimpleName()
|
"Unknown config parameter '" + configkey + "' in config model while configuring " + getOf().getSimpleName()
|
||||||
+ ", possible parameter names are " + this.paramsByName.keySet() + "."
|
+ ", possible parameter names are " + this.paramsByName.keySet() + "."
|
||||||
@ -305,6 +311,7 @@ public class ConfigModel implements NBConfigModel {
|
|||||||
|
|
||||||
throw new BasicError(paramhelp.toString());
|
throw new BasicError(paramhelp.toString());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Object value = config.get(configkey);
|
Object value = config.get(configkey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,12 +61,12 @@ public class StringCompositor implements LongFunction<String> {
|
|||||||
spans[spans.length - 1] = even_odd_spans[even_odd_spans.length - 1];
|
spans[spans.length - 1] = even_odd_spans[even_odd_spans.length - 1];
|
||||||
this.stringfunc = stringfunc;
|
this.stringfunc = stringfunc;
|
||||||
|
|
||||||
int minsize = 0;
|
// int minsize = 0;
|
||||||
for (int i = 0; i < 100; i++) {
|
// for (int i = 0; i < 100; i++) {
|
||||||
String result = apply(i);
|
// String result = apply(i);
|
||||||
minsize = Math.max(minsize, result.length());
|
// minsize = Math.max(minsize,result.length());
|
||||||
}
|
// }
|
||||||
bufsize = minsize * 2;
|
bufsize = spans.length*1024;
|
||||||
}
|
}
|
||||||
|
|
||||||
public StringCompositor(ParsedTemplateString template, Map<String, Object> fconfig) {
|
public StringCompositor(ParsedTemplateString template, Map<String, Object> fconfig) {
|
||||||
@ -78,7 +78,9 @@ public class StringCompositor implements LongFunction<String> {
|
|||||||
StringBuilder sb = new StringBuilder(bufsize);
|
StringBuilder sb = new StringBuilder(bufsize);
|
||||||
String[] ary = new String[mappers.length];
|
String[] ary = new String[mappers.length];
|
||||||
for (int i = 0; i < ary.length; i++) {
|
for (int i = 0; i < ary.length; i++) {
|
||||||
ary[i] = stringfunc.apply(mappers[i].apply(value));
|
DataMapper<?> mapperType = mappers[i];
|
||||||
|
Object object = mapperType.apply(value);
|
||||||
|
ary[i] = stringfunc.apply(object);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < LUT.length; i++) {
|
for (int i = 0; i < LUT.length; i++) {
|
||||||
sb.append(spans[i]).append(ary[LUT[i]]);
|
sb.append(spans[i]).append(ary[LUT[i]]);
|
||||||
|
@ -91,6 +91,14 @@ public class DirectoryLinesStable implements LongFunction<String> {
|
|||||||
this.totalSize = accumulator;
|
this.totalSize = accumulator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getLine(int index, int offset) {
|
||||||
|
try {
|
||||||
|
IntFunction<String> func = fileFunctions.get(index);
|
||||||
|
return func.apply(offset);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("Error while binding index=" + index + " offset=" + offset + " for " + this);
|
||||||
|
}
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public synchronized String apply(long cycle) {
|
public synchronized String apply(long cycle) {
|
||||||
int value = (int) (cycle % totalSize);
|
int value = (int) (cycle % totalSize);
|
||||||
@ -100,9 +108,7 @@ public class DirectoryLinesStable implements LongFunction<String> {
|
|||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
|
|
||||||
IntFunction<String> func = fileFunctions.get(index);
|
return this.getLine(index,value);
|
||||||
return func.apply(value);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -38,5 +38,16 @@ public class DirectoryLinesStableTest {
|
|||||||
assertThat(directoryLines.apply(Long.MAX_VALUE)).isEqualTo("data2.txt-line3");
|
assertThat(directoryLines.apply(Long.MAX_VALUE)).isEqualTo("data2.txt-line3");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOverRangeIssue() {
|
||||||
|
DirectoryLinesStable directoryLines = new DirectoryLinesStable(
|
||||||
|
"../local/testdirlines", ".+jsonl"
|
||||||
|
);
|
||||||
|
for (long i = 0; i < 40000; i++) {
|
||||||
|
String result = directoryLines.apply(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user