more updates

This commit is contained in:
Jonathan Shook 2024-02-26 13:58:04 -06:00
parent 5be14f1c83
commit 6116cb8d2e
8 changed files with 354 additions and 32 deletions

View File

@ -0,0 +1,97 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import io.nosqlbench.adapter.cqld4.optionhelpers.BatchTypeEnum;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlBatchStatement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.jetbrains.annotations.NotNull;
import java.util.function.LongFunction;
public class CqlD4BatchStmtDispenser extends Cqld4BaseOpDispenser {
private final int repeat;
private final ParsedOp subop;
private final OpMapper submapper;
private LongFunction<Statement> opfunc;
public CqlD4BatchStmtDispenser(
DriverAdapter adapter,
LongFunction<CqlSession> sessionFunc,
ParsedOp op,
int repeat,
ParsedOp subop,
OpDispenser<? extends Cqld4CqlOp> subopDispenser
) {
super(adapter, sessionFunc, op);
this.repeat = repeat;
this.subop = subop;
this.opfunc = createStmtFunc(op, subopDispenser);
this.submapper = adapter.getOpMapper();
subopDispenser = submapper.apply(subop);
}
private LongFunction<Statement> createStmtFunc(ParsedOp topOp, OpDispenser<? extends Cqld4CqlOp> subopDispenser) {
Cqld4CqlOp exampleOp = subopDispenser.apply(0L);
Statement<?> example = exampleOp.getStmt();
if (!(example instanceof BatchableStatement<?> b)) {
throw new RuntimeException("Statement type '" + example.getClass().getCanonicalName() + " is not " +
"batchable. query=" + exampleOp.getQueryString());
}
BatchTypeEnum bte = topOp.getEnumFromFieldOr(BatchTypeEnum.class, BatchTypeEnum.unlogged, "batchtype");
LongFunction<BatchStatementBuilder> bsbf = l -> new BatchStatementBuilder(bte.batchtype);
LongFunction<Statement> bsf = getBatchAccumulator(bsbf, subopDispenser);
bsf = getEnhancedStmtFunc(bsf,topOp);
return bsf;
}
@NotNull
private LongFunction<Statement> getBatchAccumulator(LongFunction<BatchStatementBuilder> bsb, OpDispenser<? extends Cqld4CqlOp> subopDispenser) {
LongFunction<BatchStatementBuilder> f = l -> {
BatchStatementBuilder bsa = bsb.apply(l);
for (int i = 0; i < repeat; i++) {
Cqld4CqlOp op = subopDispenser.apply(i+l);
BatchableStatement<?> stmt = (BatchableStatement<?>) op.getStmt();
bsa= bsa.addStatement(stmt);
}
return bsa;
};
LongFunction<Statement> bsf = (long l) -> f.apply(l).build();
return bsf;
}
@Override
public Cqld4CqlOp apply(long value) {
Statement bstmt = opfunc.apply(value);
return new Cqld4CqlBatchStatement(
getSessionFunc().apply(value),
(BatchStatement) bstmt,
getMaxPages(),
getMaxLwtRetries(),
isRetryReplace(),
this
);
}
}

View File

@ -17,24 +17,15 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Processors;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.CqlD4BatchStmtDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlBatchStatement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.nb.api.config.params.ParamsParser;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.virtdata.core.templates.ParsedTemplateString;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class CqlD4BatchStmtMapper implements OpMapper<Cqld4CqlOp> {
@ -52,28 +43,32 @@ public class CqlD4BatchStmtMapper implements OpMapper<Cqld4CqlOp> {
public OpDispenser<Cqld4CqlOp> apply(ParsedOp op) {
ParsedOp subop = op.getAsSubOp("op_template");
int repeat = subop.getStaticConfigOr("repeat", 1);
int repeat = op.getStaticValue("repeat");
OpMapper<Cqld4CqlOp> subopMapper = adapter.getOpMapper();
OpDispenser<? extends Cqld4CqlOp> subopDispenser = subopMapper.apply(subop);
return new CqlD4BatchStmtDispenser(adapter, sessionFunc, op,repeat, subop, subopDispenser);
ParsedTemplateString stmtTpl = op.getAsTemplate(target.field).orElseThrow(() -> new BasicError(
"No statement was found in the op template:" + op
));
RSProcessors processors = new RSProcessors();
if (stmtTpl.getCaptures().size()>0) {
processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
}
Optional<List> processorList = op.getOptionalStaticConfig("processors", List.class);
processorList.ifPresent(l -> {
l.forEach(m -> {
Map<String, String> pconfig = ParamsParser.parseToMap(m, "type");
ResultSetProcessor processor = Cqld4Processors.resolve(pconfig);
processors.add(() -> processor);
});
});
return new Cqld4PreparedStmtDispenser(adapter, sessionFunc, op, stmtTpl, processors);
// ParsedTemplateString stmtTpl = op.getAsTemplate(target.field).orElseThrow(() -> new BasicError(
// "No statement was found in the op template:" + op
// ));
//
// RSProcessors processors = new RSProcessors();
// if (stmtTpl.getCaptures().size()>0) {
// processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
// }
//
// Optional<List> processorList = op.getOptionalStaticConfig("processors", List.class);
//
// processorList.ifPresent(l -> {
// l.forEach(m -> {
// Map<String, String> pconfig = ParamsParser.parseToMap(m, "type");
// ResultSetProcessor processor = Cqld4Processors.resolve(pconfig);
// processors.add(() -> processor);
// });
// });
//
// return new Cqld4PreparedStmtDispenser(adapter, sessionFunc, op, stmtTpl, processors);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.optionhelpers;
import com.datastax.oss.driver.api.core.cql.BatchType;
public enum BatchTypeEnum {
logged(BatchType.LOGGED),
unlogged(BatchType.UNLOGGED),
counter(BatchType.COUNTER);
public final BatchType batchtype;
BatchTypeEnum(BatchType batchtype) {
this.batchtype = batchtype;
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.datamappers.functions.to_cqlvector;
import com.datastax.oss.driver.api.core.data.CqlVector;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import java.util.function.LongFunction;
@Categories(Category.state)
@ThreadSafeMapper
public class LoadCqlVectorFromArray implements LongFunction<CqlVector> {
private final String name;
private final Function<Object, Object> nameFunc;
private final CqlVector[] defaultValue;
private final int len;
public LoadCqlVectorFromArray(String name, int len) {
this.name = name;
this.nameFunc = null;
Float[] ary = new Float[len];
for (int i = 0; i < len; i++) {
ary[i] = (float)i;
}
this.defaultValue = new CqlVector[]{CqlVector.newInstance(ary)};
this.len = len;
}
@Override
public CqlVector apply(long cycle) {
int offset = (int) (cycle % len);
HashMap<String, Object> map = SharedState.tl_ObjectMap.get();
String varname = (nameFunc != null) ? String.valueOf(nameFunc.apply(cycle)) : name;
Object object = map.getOrDefault(varname, defaultValue);
if (object.getClass().isArray()) {
object = Array.get(object,offset);
} else if (object instanceof Double[][] dary) {
object = dary[offset];
} else if (object instanceof Float[][] fary) {
object = fary[offset];
} else if (object instanceof CqlVector[] cary) {
object = cary[offset];
} else if (object instanceof List list) {
object = list.get(offset);
} else {
throw new RuntimeException("Unrecognized type for ary of ary:" + object.getClass().getCanonicalName());
}
if (object instanceof CqlVector<?> cqlvector) {
return cqlvector;
} else if (object instanceof float[] fa) {
Float[] ary = new Float[fa.length];
for (int i = 0; i < fa.length; i++) {
ary[i] = fa[i];
}
return CqlVector.newInstance(ary);
} else if (object instanceof double[] da) {
Double[] ary = new Double[da.length];
for (int i = 0; i < da.length; i++) {
ary[i] = da[i];
}
return CqlVector.newInstance(ary);
} else {
return (CqlVector) object;
}
}
}

View File

@ -9,11 +9,17 @@ description: |
population are replaced with new values which never repeat. During the main phase, random partitions are selected for
upsert, with row values never repeating.
TEMPLATE(batchsize,100)
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
batch:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup_batch cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
astra:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
@ -26,6 +32,7 @@ scenarios:
bindings:
seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString() -> String
seq_value: Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String
batch_seq_value: Mul(TEMPLATE(batchsize,100)L); Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String
rw_key: TEMPLATE(keydist,Uniform(0,1000000000)); ToString() -> String
rw_value: Hash(); TEMPLATE(valdist,Uniform(0,1000000000)); ToString() -> String
@ -62,6 +69,18 @@ blocks:
insert into TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue)
(key, value)
values ({seq_key},{seq_value});
rampup_batch:
params:
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
ops:
rampup_insert:
batch: testing
repeat: 100
op_template:
prepared: |
insert into TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue)
(key, value)
values ({seq_key},{seq_value});
verify:
params:
cl: TEMPLATE(read_cl,LOCAL_QUORUM)

View File

@ -140,6 +140,13 @@ ops:
raw: |
create table if not exist {ksname}.{tblname} ...
example-batch-stmt:
batch:
repeat: 50
op_template:
prepared: |
select three, four from knock.onthedoor where ...
# gremlin statement using the fluent API, as it would be written in a client application
example-fluent-graph-stmt:
fluent: >-

View File

@ -94,4 +94,24 @@ public class JsonElementUtils {
return floats;
}
public static float[][] customNumberArrayToFloatArrayBatch(JsonElement element) {
JsonObject o1 = element.getAsJsonObject();
JsonElement data = o1.get("data");
JsonArray dary = data.getAsJsonArray();
float[][] floats2dary = new float[dary.size()][];
for (int vector_idx = 0; vector_idx < dary.size(); vector_idx++) {
JsonElement element0 = dary.get(vector_idx);
JsonObject eobj1 = element0.getAsJsonObject();
JsonElement embedding = eobj1.get("embedding");
JsonArray ary = embedding.getAsJsonArray();
float[] newV = new float[ary.size()];
for (int component_idx = 0; component_idx < floats2dary.length; component_idx++) {
newV[component_idx]=ary.get(component_idx).getAsFloat();
}
floats2dary[vector_idx]=newV;
}
return floats2dary;
}
}

View File

@ -1 +1,64 @@
["{ \" id \" : \" 0704. 0001 \", \" title \" : \" calculation of prompt diphoton production cross sections at tevatron and \\ n lhc energies \", \" abstract \" : \" a fully differential calculation in perturbative quantum chromodynamics is \\ npresented for the production of massive photon pairs at hadron colliders. all \\ nnext - to - leading order perturbative contributions from quark - antiquark, \\ ngluon - ( anti ) quark, and gluon - gluon subprocesses are included, as well as \\ nall - orders resummation of initial - state gluon radiation valid at \\ nnext - to - next - to - leading logarithmic accuracy. the region of phase space is \\ nspecified in which the calculation is most reliable. good agreement is \\ ndemonstrated with data from the fermilab tevatron, and predictions are made for \\ nmore detailed tests with cdf and do data. predictions are shown for \\ ndistributions of diphoton pairs produced at the energy of the large hadron \\ ncollider ( lhc ). distributions of the diphoton pairs from the decay of a higgs \\ nboson are contrasted with those produced from qcd processes at the lhc, showing \\ nthat enhanced sensitivity to the signal can be obtained with judicious \\ nselection of events. \\ n \", \" categories \" : \" hep - ph \" } { \" id \" : \" 0704. 0002 \", \" title \" : \" sparsity - certifying graph decompositions \", \" abstract \" : \" we describe a new algorithm, the $ ( k, \\ \\ ell ) $ - pebble game with colors, and use \\ nit obtain a characterization of the family of $ ( k, \\ \\ ell ) $ - sparse graphs and \\ nalgorithmic solutions to a family of problems concerning tree decompositions of \\ ngraphs. special instances of sparse graphs appear in rigidity theory and have \\ nreceived increased attention in recent years. in particular, our colored \\ npebbles generalize and strengthen the previous results of lee and streinu and \\ ngive a new proof of the tutte - nash - williams characterization of arboricity",". we \\ nalso present a new decomposition that certifies sparsity based on the \\ n $ ( k, \\ \\ ell ) $ - pebble game with colors. our work also exposes connections between \\ npebble game algorithms and previous sparse graph algorithms by gabow, gabow and \\ nwestermann and hendrickson. \\ n \", \" categories \" : \" math. co cs. cg \" } { \" id \" : \" 0704. 0003 \", \" title \" : \" the evolution of the earth - moon system based on the dark matter field \\ n fluid model \", \" abstract \" : \" the evolution of earth - moon system is described by the dark matter field \\ nfluid model proposed in the meeting of division of particle and field 2004, \\ namerican physical society. the current behavior of the earth - moon system agrees \\ nwith this model very well and the general pattern of the evolution of the \\ nmoon - earth system described by this model agrees with geological and fossil \\ nevidence. the closest distance of the moon to earth was about 259000 km at 4. 5 \\ nbillion years ago, which is far beyond the roche's limit. the result suggests \\ nthat the tidal friction may not be the primary cause for the evolution of the \\ nearth - moon system. the average dark matter field fluid constant derived from \\ nearth - moon system data is 4. 39 x 10 ^ ( - 22 ) s ^ ( - 1 ) m ^ ( - 1 ). this model predicts \\ nthat the mars's rotation is also slowing with the angular acceleration rate \\ nabout - 4. 38 x 10 ^ ( - 22 ) rad s ^ ( - 2 ). \\ n \", \" categories \" : \" physics. gen - ph \" } { \" id \" : \" 0704. 0004 \", \" title \" : \" a determinant of stirling cycle numbers counts unlabeled acyclic \\ n single - source automata \", \" abstract \" : \" we show that a determinant of stirling cycle numbers counts unlabeled acyclic \\ nsingle - source automata. the proof involves a bijection from these automata to \\ ncertain marked lattice paths and a sign - reversing involution to evaluate the \\ ndeterminant. \\ n"]
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.library.basics.shared.from_long.to_string;
import io.nosqlbench.virtdata.library.basics.shared.conversions.from_any.ToJSONFPretty;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_collection.ListSized;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_collection.ListSizedStepped;
import org.junit.jupiter.api.Test;
import java.util.function.LongFunction;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
public class ConcatArrayTest {
@Test
public void testConcatArray() {
ConcatArray ca = new ConcatArray(",", 2, "{\n{}\n}", new NumberNameToString());
assertThat(ca.apply(3L)).isEqualTo("""
{
three,four
}""");
}
@Test
public void testConcatArrayJson() {
ToJSONFPretty jsonlist = new ToJSONFPretty(
(LongFunction) new ListSizedStepped(4,
new NumberNameToString()));
ConcatArray ca = new ConcatArray(
",", 2, "{\n{}\n}", jsonlist);
assertThat(ca.apply(3L)).isEqualTo("""
{
[
"three",
"four",
"five",
"six"
],[
"four",
"five",
"six",
"seven"
]
}""");
}
}