From 88457aa9d9de86508662dd625e6e078bd10b7468 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 19 May 2023 15:07:19 +0200 Subject: [PATCH] [venice] Introduce the Writer operation and clean up the metrics - introduce a new WriteOp to write directly to Venice - support AVRO keys for reads and for writes - remove useless metrics --- adapter-venice/pom.xml | 26 ++++++- .../adapter/venice/VeniceOpMapper.java | 2 + .../adapter/venice/VeniceOpType.java | 3 +- .../adapter/venice/VeniceSpace.java | 39 +++++++++-- .../dispensers/ReadSingleKeyOpDispenser.java | 10 ++- .../dispensers/VeniceBaseOpDispenser.java | 22 ++++-- .../venice/dispensers/WriteOpDispenser.java | 68 +++++++++++++++++++ .../adapter/venice/ops/ReadSingleKeyOp.java | 8 +-- .../adapter/venice/ops/WriteOp.java | 65 ++++++++++++++++++ .../adapter/venice/util/AvroUtils.java | 57 ++++++++++++++++ .../venice/util/VeniceAdapterMetrics.java | 5 +- .../main/resources/scripts/create-store.sh | 24 +++++++ .../src/main/resources/scripts/download.sh | 18 +++++ .../src/main/resources/scripts/key.avsc | 1 + .../src/main/resources/scripts/prepare.sh | 0 .../src/main/resources/scripts/value.avsc | 1 + adapter-venice/src/main/resources/venice.md | 30 +++++++- .../src/main/resources/venice_reader.yaml | 9 +-- .../src/main/resources/venice_writer.yaml | 12 ++++ .../java/io/nosqlbench/engine/cli/NBCLI.java | 1 + 20 files changed, 371 insertions(+), 30 deletions(-) create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/WriteOpDispenser.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/WriteOp.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/AvroUtils.java create mode 100755 adapter-venice/src/main/resources/scripts/create-store.sh create mode 100755 adapter-venice/src/main/resources/scripts/download.sh create mode 100644 adapter-venice/src/main/resources/scripts/key.avsc create mode 100644 adapter-venice/src/main/resources/scripts/prepare.sh create mode 100644 adapter-venice/src/main/resources/scripts/value.avsc create mode 100644 adapter-venice/src/main/resources/venice_writer.yaml diff --git a/adapter-venice/pom.xml b/adapter-venice/pom.xml index fc893ecf1..1ebf4e68e 100644 --- a/adapter-venice/pom.xml +++ b/adapter-venice/pom.xml @@ -33,7 +33,7 @@ - 0.4.17-alpha-9 + 0.4.17-alpha-12 @@ -74,6 +74,30 @@ + + com.linkedin.venice + venice-producer + ${venice.version} + + + org.sonatype.oss + * + + + org.apache.helix + * + + + org.apache.logging.log4j + * + + + org.conscrypt + * + + + + com.linkedin.venice venice-thin-client diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java index 1dd9094e1..da427b190 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java @@ -61,6 +61,8 @@ public class VeniceOpMapper implements OpMapper { return switch (opType.enumId) { case ReadSingleKey -> new ReadSingleKeyOpDispenser(adapter, op, veniceSpace); + case Write -> + new WriteOpDispenser(adapter, op, veniceSpace); }; } } diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java index 18285b97b..c84b8bac8 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java @@ -18,7 +18,8 @@ package io.nosqlbench.adapter.venice; public enum VeniceOpType { // read a single key - ReadSingleKey + ReadSingleKey, + Write } diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java index ca227105c..37c58ab09 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java @@ -16,9 +16,13 @@ package io.nosqlbench.adapter.venice; +import com.linkedin.venice.authentication.jwt.ClientAuthenticationProviderToken; import com.linkedin.venice.client.store.AvroGenericStoreClient; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; +import com.linkedin.venice.producer.online.OnlineProducerFactory; +import com.linkedin.venice.producer.online.OnlineVeniceProducer; +import com.linkedin.venice.utils.VeniceProperties; import io.nosqlbench.api.config.standard.ConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; @@ -39,9 +43,12 @@ public class VeniceSpace implements AutoCloseable { private long veniceActivityStartTimeMills; private final String token; + private ClientConfig clientConfig; private AvroGenericStoreClient client; + private OnlineVeniceProducer producer; + public VeniceSpace(String spaceName, NBConfiguration cfg) { this.spaceName = spaceName; @@ -71,25 +78,47 @@ public class VeniceSpace implements AutoCloseable { .asReadOnly(); } - public AvroGenericStoreClient getClient() { + public synchronized AvroGenericStoreClient getClient() { + if (client == null) { + client = ClientFactory.getAndStartGenericAvroClient(clientConfig); + } return client; } + public synchronized OnlineVeniceProducer getProducer() { + if (producer == null) { + VeniceProperties properties = VeniceProperties.empty(); + producer = OnlineProducerFactory.createProducer(clientConfig, properties,null); + } + return producer; + } + public void initializeSpace() { - ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(storeName); + this.clientConfig = ClientConfig.defaultGenericClientConfig(storeName); clientConfig.setVeniceURL(routerUrl); clientConfig.setForceClusterDiscoveryAtStartTime(true); - clientConfig.setToken(token); - client = ClientFactory.getAndStartGenericAvroClient(clientConfig); + if (token != null && !token.isEmpty()) { + clientConfig.setAuthenticationProvider(ClientAuthenticationProviderToken.TOKEN(token)); + } } public void shutdownSpace() { try { - client.close(); + if (client != null) { + client.close(); + } } catch (Exception e) { logger.error("Unexpected error when shutting down NB S4J space.", e); } + try { + if (producer != null) { + producer.close(); + } + } + catch (Exception e) { + logger.error("Unexpected error when shutting down NB S4J space.", e); + } } public long getVeniceActivityStartTimeMills() { diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java index 7c4727711..3967940c2 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java @@ -18,8 +18,10 @@ package io.nosqlbench.adapter.venice.dispensers; import io.nosqlbench.adapter.venice.VeniceSpace; import io.nosqlbench.adapter.venice.ops.ReadSingleKeyOp; +import io.nosqlbench.adapter.venice.util.AvroUtils; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.avro.Schema; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -36,6 +38,10 @@ public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser { private final static Logger logger = LogManager.getLogger("ReadSingleKeyOpDispenser"); private final LongFunction keyStrFunc; + private final Schema keySchema; + + private static final String KEY_SCHEMA_OP_PARAM = "keySchema"; + private static final String KEY_OP_PARAM = "key"; public ReadSingleKeyOpDispenser(DriverAdapter adapter, @@ -43,14 +49,16 @@ public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser { VeniceSpace s4jSpace) { super(adapter, op, s4jSpace); this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM); + this.keySchema = lookupAvroSchema(KEY_SCHEMA_OP_PARAM); } @Override public ReadSingleKeyOp apply(long cycle) { String key = keyStrFunc.apply(cycle); + Object encodedKey = AvroUtils.encodeToAvro(keySchema, key); return new ReadSingleKeyOp( veniceAdapterMetrics, veniceSpace, - key); + encodedKey); } } diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java index 05c600c48..9a0df6865 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java @@ -22,16 +22,11 @@ import io.nosqlbench.adapter.venice.util.*; import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; -import org.apache.commons.lang3.BooleanUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; +import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.*; import java.util.function.LongFunction; -import java.util.function.Predicate; -import java.util.stream.Collectors; public abstract class VeniceBaseOpDispenser extends BaseOpDispenser { @@ -65,5 +60,20 @@ public abstract class VeniceBaseOpDispenser extends BaseOpDispenser keyStrFunc; + private final LongFunction valueStrFunc; + private final Schema keySchema; + + private final Schema valueSchema; + + private static final String KEY_OP_PARAM = "key"; + private static final String VALUE_OP_PARAM = "value"; + + private static final String VALUE_SCHEMA_OP_PARAM = "valueSchema"; + private static final String KEY_SCHEMA_OP_PARAM = "keySchema"; + + + public WriteOpDispenser(DriverAdapter adapter, + ParsedOp op, + VeniceSpace s4jSpace) { + super(adapter, op, s4jSpace); + this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM); + this.keySchema = lookupAvroSchema(KEY_SCHEMA_OP_PARAM); + this.valueStrFunc = lookupMandtoryStrOpValueFunc(VALUE_OP_PARAM); + this.valueSchema = lookupAvroSchema(VALUE_SCHEMA_OP_PARAM); + } + + @Override + public WriteOp apply(long cycle) { + String key = keyStrFunc.apply(cycle); + String value = valueStrFunc.apply(cycle); + Object encodedKey = AvroUtils.encodeToAvro(keySchema, key); + Object encodedValue = AvroUtils.encodeToAvro(valueSchema, value); + return new WriteOp( + veniceAdapterMetrics, + veniceSpace, + encodedKey, + encodedValue); + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java index 8082098fb..c711009b9 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java @@ -18,6 +18,7 @@ package io.nosqlbench.adapter.venice.ops; import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import com.linkedin.venice.client.store.AvroGenericStoreClient; import io.nosqlbench.adapter.venice.VeniceSpace; @@ -33,15 +34,14 @@ public class ReadSingleKeyOp extends VeniceOp { private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp"); private final AvroGenericStoreClient client; - private final String key; + private final Object key; private final Timer executeTimer; - private Counter foundCounter; private Counter notFoundCounter; public ReadSingleKeyOp(VeniceAdapterMetrics veniceAdapterMetrics, VeniceSpace veniceSpace, - String key) { + Object key) { super(veniceAdapterMetrics, veniceSpace); this.client = veniceSpace.getClient(); this.key = key; @@ -57,7 +57,7 @@ public class ReadSingleKeyOp extends VeniceOp { CompletableFuture handle = client.get(key); callValue = handle.join(); if (logger.isDebugEnabled()) { - logger.debug("ReadSingleKeyOp key={} value={}", key, callValue); + logger.debug("ReadSingleKeyOp key={} value={} latency {}", key, callValue); } } if (callValue != null) { diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/WriteOp.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/WriteOp.java new file mode 100644 index 000000000..85e76a7a2 --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/WriteOp.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice.ops; + + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import com.linkedin.venice.producer.DurableWrite; +import com.linkedin.venice.producer.online.OnlineVeniceProducer; +import io.nosqlbench.adapter.venice.VeniceSpace; +import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics; +import org.apache.avro.generic.GenericRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +public class WriteOp extends VeniceOp { + + private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp"); + + private final OnlineVeniceProducer producer; + private final Object key; + private final Object value; + private final Timer executeTimer; + + public WriteOp(VeniceAdapterMetrics veniceAdapterMetrics, + VeniceSpace veniceSpace, + Object key, + Object value) { + super(veniceAdapterMetrics, veniceSpace); + this.producer = veniceSpace.getProducer(); + this.key = key; + this.value = value; + this.executeTimer = veniceAdapterMetrics.getExecuteTimer(); + } + + @Override + public Object apply(long value) { + Object callValue; + try (Timer.Context time = executeTimer.time();) { + CompletableFuture handle = producer.asyncPut(this.key, this.value); + callValue = handle.join(); + if (logger.isDebugEnabled()) { + logger.debug("Write key={} value={} res {}", key, callValue, callValue); + } + } + return null; + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/AvroUtils.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/AvroUtils.java new file mode 100644 index 000000000..49864e812 --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/AvroUtils.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.nosqlbench.adapter.venice.util; + +import org.apache.avro.Schema; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.JsonDecoder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +public class AvroUtils { + private static final Logger logger = LogManager.getLogger("AvroUtils"); + + public static org.apache.avro.Schema parseAvroSchema(String avroSchemDef) { + return new org.apache.avro.Schema.Parser().parse(avroSchemDef); + } + + public static Object encodeToAvro(org.apache.avro.Schema schema, String jsonData) { + if (schema.getType() == Schema.Type.STRING) { + return jsonData; + } else if (schema.getType() == Schema.Type.RECORD) { + org.apache.avro.generic.GenericRecord record = null; + + try { + org.apache.avro.generic.GenericDatumReader reader; + reader = new org.apache.avro.generic.GenericDatumReader<>(schema); + + JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, jsonData); + + record = reader.read(null, decoder); + } catch (IOException ioe) { + logger.info("Cannot convert JSON {} to AVRO: ", jsonData, ioe); + throw new RuntimeException(ioe); + } + + return record; + } else { + throw new RuntimeException("Unsupported schema + " + schema.getType()+ ", only string and record"); + } + } + +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java index 45760314e..545793362 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java @@ -17,10 +17,9 @@ package io.nosqlbench.adapter.venice.util; import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import io.nosqlbench.adapter.venice.dispensers.VeniceBaseOpDispenser; -import io.nosqlbench.api.config.NBLabeledElement; -import io.nosqlbench.api.config.NBLabels; import io.nosqlbench.api.engine.metrics.ActivityMetrics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,7 +50,6 @@ public class VeniceAdapterMetrics { veniceBaseOpDispenser,"execute", ActivityMetrics.DEFAULT_HDRDIGITS); - this.foundCounter = ActivityMetrics.counter( veniceBaseOpDispenser,"found"); @@ -70,5 +68,4 @@ public class VeniceAdapterMetrics { public Counter getNotFoundCounter() { return notFoundCounter; } - } diff --git a/adapter-venice/src/main/resources/scripts/create-store.sh b/adapter-venice/src/main/resources/scripts/create-store.sh new file mode 100755 index 000000000..4602ee150 --- /dev/null +++ b/adapter-venice/src/main/resources/scripts/create-store.sh @@ -0,0 +1,24 @@ +#bin/bash +# This is an utility script to create the store in Venice. +# Use ./download.sh in order to download the binaries needed to run this script. + +set -x -e +HERE=$(realpath $0) +cd $HERE + +jar=binaries/*admin-tool-all*.jar +storeName=$1 +url=http://localhost:5555 +clusterName=venice-cluster0 +keySchema=key.avsc +valueSchema=value.avsc + +# create the store +java -jar $jar --new-store --url $url --cluster $clusterName --store $storeName --key-schema-file $keySchema --value-schema-file $valueSchema --hybrid-data-replication-policy NON_AGGREGATE + + +# enable incremental push and disable read quota +java -jar $jar --update-store --url $url --cluster $clusterName --store $storeName --storage-quota -1 --incremental-push-enabled true --hybrid-data-replication-policy NON_AGGREGATE --read-quota 1000000 + +# create the first version of the store +java -jar $jar --empty-push --url $url --cluster $clusterName --store $storeName --push-id init --store-size 1000 diff --git a/adapter-venice/src/main/resources/scripts/download.sh b/adapter-venice/src/main/resources/scripts/download.sh new file mode 100755 index 000000000..06d1bc098 --- /dev/null +++ b/adapter-venice/src/main/resources/scripts/download.sh @@ -0,0 +1,18 @@ +########## +# This script downloads the binaries needed to run the create-store.sh script. +########## +set -x -e +HERE=$(realpath $(dirname $0)) +VENICETOOLSURL=https://github.com/datastax/venice/releases/download/ds-0.4.17-alpha-12/venice-admin-tool-all.jar +BINDIR=$HERE/binaries +rm -Rf $BINDIR +mkdir $BINDIR +pushd $BINDIR +cd $BINDIR +curl -L -O $AVROTOOLSURL +popd + + + + + diff --git a/adapter-venice/src/main/resources/scripts/key.avsc b/adapter-venice/src/main/resources/scripts/key.avsc new file mode 100644 index 000000000..4da502a0b --- /dev/null +++ b/adapter-venice/src/main/resources/scripts/key.avsc @@ -0,0 +1 @@ +{"name": "key","type": "string"} diff --git a/adapter-venice/src/main/resources/scripts/prepare.sh b/adapter-venice/src/main/resources/scripts/prepare.sh new file mode 100644 index 000000000..e69de29bb diff --git a/adapter-venice/src/main/resources/scripts/value.avsc b/adapter-venice/src/main/resources/scripts/value.avsc new file mode 100644 index 000000000..93b512e2f --- /dev/null +++ b/adapter-venice/src/main/resources/scripts/value.avsc @@ -0,0 +1 @@ +{"type":"record","name":"Person","namespace":"org.example.WriteKeyValue","fields":[{"name":"age","type":"int"},{"name":"name","type":"string"}]} diff --git a/adapter-venice/src/main/resources/venice.md b/adapter-venice/src/main/resources/venice.md index c8d9c0aa0..09ca42599 100644 --- a/adapter-venice/src/main/resources/venice.md +++ b/adapter-venice/src/main/resources/venice.md @@ -9,6 +9,32 @@ Configuration options: - store_name: the name of the store to use (default: store1) - token: the token to use for authentication (default: none) -# 2. Sample command +# 2. Sample commands -java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_reader.yaml store_name=store1 router_url=http://localhost:7777 cycles=100 -v +You can run Venice using the Venice Standalone package: + +```bash +git clone https://github.com/datastax/venice +cd venice +./gradlew :services:venice-standalone:run +``` + +Then you create a "store" using the scripts in the `adapter-venice/src/main/resources` directory: + +```bash +cd adapter-venice/src/main/resources/scripts +./dowload.sh +./create-store.sh store1 +``` + +Then you can populate the store with some data using NB: + +```bash +java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_writer.yaml store_name=store1 router_url=http://localhost:7777 cycles=100000 -v --report-summary-to stdout:60 --report-csv-to reports +``` + +And you can read the data back using NB as well: + +```bash +java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_reader.yaml store_name=store1 router_url=http://localhost:7777 cycles=100000 -v --report-summary-to stdout:60 --report-csv-to reports +``` diff --git a/adapter-venice/src/main/resources/venice_reader.yaml b/adapter-venice/src/main/resources/venice_reader.yaml index cfdaa8b89..fe58c9498 100644 --- a/adapter-venice/src/main/resources/venice_reader.yaml +++ b/adapter-venice/src/main/resources/venice_reader.yaml @@ -1,13 +1,10 @@ bindings: - mykey: Mod(5); ToString(); Prefix("key-") - -# document level parameters that apply to all Pulsar client types: -params: - temporary_dest: "false" + mykey: Mod(900000); ToString(); Prefix("name") blocks: read-block: ops: op1: - ReadSingleKey: "store1" + ReadSingleKey: "" key: "{mykey}" + keySchema: "\"string\"" diff --git a/adapter-venice/src/main/resources/venice_writer.yaml b/adapter-venice/src/main/resources/venice_writer.yaml new file mode 100644 index 000000000..d90ded3dd --- /dev/null +++ b/adapter-venice/src/main/resources/venice_writer.yaml @@ -0,0 +1,12 @@ +bindings: + mykey: Mod(900000); ToString(); Prefix("name") + +blocks: + wrtie-block: + ops: + op1: + Write: "" + key: "{mykey}" + keySchema: "\"string\"" + value: "{\"name\":\"{mykey}\",\"age\":10}" + valueSchema: "{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"org.example.WriteKeyValue\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}" diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java index 38c9dc1bf..c71e71d52 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java @@ -394,6 +394,7 @@ public class NBCLI implements Function, NBLabeledElement { if (null != reportGraphiteTo) reporters.addGraphite(reportGraphiteTo, options.wantsMetricsPrefix()); if (null != options.wantsReportCsvTo()) reporters.addCSVReporter(options.wantsReportCsvTo(), options.wantsMetricsPrefix()); + reporters.addLogger(); reporters.start(10, options.getReportInterval()); }