[venice] Introduce the Writer operation and clean up the metrics

- introduce a new WriteOp to write directly to Venice
- support AVRO keys for reads and for writes
- remove useless metrics
This commit is contained in:
Enrico Olivelli 2023-05-19 15:07:19 +02:00
parent 45b9a2deb4
commit 88457aa9d9
20 changed files with 371 additions and 30 deletions

View File

@ -33,7 +33,7 @@
</description>
<properties>
<venice.version>0.4.17-alpha-9</venice.version>
<venice.version>0.4.17-alpha-12</venice.version>
</properties>
<dependencies>
@ -74,6 +74,30 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.linkedin.venice</groupId>
<artifactId>venice-producer</artifactId>
<version>${venice.version}</version>
<exclusions>
<exclusion>
<groupId>org.sonatype.oss</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.helix</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.conscrypt</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.linkedin.venice</groupId>
<artifactId>venice-thin-client</artifactId>

View File

@ -61,6 +61,8 @@ public class VeniceOpMapper implements OpMapper<VeniceOp> {
return switch (opType.enumId) {
case ReadSingleKey ->
new ReadSingleKeyOpDispenser(adapter, op, veniceSpace);
case Write ->
new WriteOpDispenser(adapter, op, veniceSpace);
};
}
}

View File

@ -18,7 +18,8 @@ package io.nosqlbench.adapter.venice;
public enum VeniceOpType {
// read a single key
ReadSingleKey
ReadSingleKey,
Write
}

View File

@ -16,9 +16,13 @@
package io.nosqlbench.adapter.venice;
import com.linkedin.venice.authentication.jwt.ClientAuthenticationProviderToken;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.producer.online.OnlineProducerFactory;
import com.linkedin.venice.producer.online.OnlineVeniceProducer;
import com.linkedin.venice.utils.VeniceProperties;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
@ -39,9 +43,12 @@ public class VeniceSpace implements AutoCloseable {
private long veniceActivityStartTimeMills;
private final String token;
private ClientConfig clientConfig;
private AvroGenericStoreClient<Object, Object> client;
private OnlineVeniceProducer<Object, Object> producer;
public VeniceSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
@ -71,25 +78,47 @@ public class VeniceSpace implements AutoCloseable {
.asReadOnly();
}
public AvroGenericStoreClient<Object, Object> getClient() {
public synchronized AvroGenericStoreClient<Object, Object> getClient() {
if (client == null) {
client = ClientFactory.getAndStartGenericAvroClient(clientConfig);
}
return client;
}
public synchronized OnlineVeniceProducer<Object, Object> getProducer() {
if (producer == null) {
VeniceProperties properties = VeniceProperties.empty();
producer = OnlineProducerFactory.createProducer(clientConfig, properties,null);
}
return producer;
}
public void initializeSpace() {
ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(storeName);
this.clientConfig = ClientConfig.defaultGenericClientConfig(storeName);
clientConfig.setVeniceURL(routerUrl);
clientConfig.setForceClusterDiscoveryAtStartTime(true);
clientConfig.setToken(token);
client = ClientFactory.getAndStartGenericAvroClient(clientConfig);
if (token != null && !token.isEmpty()) {
clientConfig.setAuthenticationProvider(ClientAuthenticationProviderToken.TOKEN(token));
}
}
public void shutdownSpace() {
try {
client.close();
if (client != null) {
client.close();
}
}
catch (Exception e) {
logger.error("Unexpected error when shutting down NB S4J space.", e);
}
try {
if (producer != null) {
producer.close();
}
}
catch (Exception e) {
logger.error("Unexpected error when shutting down NB S4J space.", e);
}
}
public long getVeniceActivityStartTimeMills() {

View File

@ -18,8 +18,10 @@ package io.nosqlbench.adapter.venice.dispensers;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.ops.ReadSingleKeyOp;
import io.nosqlbench.adapter.venice.util.AvroUtils;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.avro.Schema;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -36,6 +38,10 @@ public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOpDispenser");
private final LongFunction<String> keyStrFunc;
private final Schema keySchema;
private static final String KEY_SCHEMA_OP_PARAM = "keySchema";
private static final String KEY_OP_PARAM = "key";
public ReadSingleKeyOpDispenser(DriverAdapter adapter,
@ -43,14 +49,16 @@ public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser {
VeniceSpace s4jSpace) {
super(adapter, op, s4jSpace);
this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM);
this.keySchema = lookupAvroSchema(KEY_SCHEMA_OP_PARAM);
}
@Override
public ReadSingleKeyOp apply(long cycle) {
String key = keyStrFunc.apply(cycle);
Object encodedKey = AvroUtils.encodeToAvro(keySchema, key);
return new ReadSingleKeyOp(
veniceAdapterMetrics,
veniceSpace,
key);
encodedKey);
}
}

View File

@ -22,16 +22,11 @@ import io.nosqlbench.adapter.venice.util.*;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public abstract class VeniceBaseOpDispenser extends BaseOpDispenser<VeniceOp, VeniceSpace> {
@ -65,5 +60,20 @@ public abstract class VeniceBaseOpDispenser extends BaseOpDispenser<VeniceOp, V
return stringLongFunction;
}
protected Schema lookupAvroSchema(String paramName) {
String schema = parsedOp.getStaticValueOr(paramName, "");
try {
if (schema.isEmpty()) {
schema = Schema.Type.STRING.getName();
logger.info("{}: {} (default)", paramName, schema);
} else {
logger.info("{}: {}", paramName, schema);
}
return AvroUtils.parseAvroSchema(schema);
} catch (Exception err) {
throw new IllegalArgumentException("Cannot parse avro schema "+schema);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.dispensers;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.ops.WriteOp;
import io.nosqlbench.adapter.venice.util.AvroUtils;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class WriteOpDispenser extends VeniceBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOpDispenser");
private final LongFunction<String> keyStrFunc;
private final LongFunction<String> valueStrFunc;
private final Schema keySchema;
private final Schema valueSchema;
private static final String KEY_OP_PARAM = "key";
private static final String VALUE_OP_PARAM = "value";
private static final String VALUE_SCHEMA_OP_PARAM = "valueSchema";
private static final String KEY_SCHEMA_OP_PARAM = "keySchema";
public WriteOpDispenser(DriverAdapter adapter,
ParsedOp op,
VeniceSpace s4jSpace) {
super(adapter, op, s4jSpace);
this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM);
this.keySchema = lookupAvroSchema(KEY_SCHEMA_OP_PARAM);
this.valueStrFunc = lookupMandtoryStrOpValueFunc(VALUE_OP_PARAM);
this.valueSchema = lookupAvroSchema(VALUE_SCHEMA_OP_PARAM);
}
@Override
public WriteOp apply(long cycle) {
String key = keyStrFunc.apply(cycle);
String value = valueStrFunc.apply(cycle);
Object encodedKey = AvroUtils.encodeToAvro(keySchema, key);
Object encodedValue = AvroUtils.encodeToAvro(valueSchema, value);
return new WriteOp(
veniceAdapterMetrics,
veniceSpace,
encodedKey,
encodedValue);
}
}

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.venice.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import io.nosqlbench.adapter.venice.VeniceSpace;
@ -33,15 +34,14 @@ public class ReadSingleKeyOp extends VeniceOp {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp");
private final AvroGenericStoreClient<Object, Object> client;
private final String key;
private final Object key;
private final Timer executeTimer;
private Counter foundCounter;
private Counter notFoundCounter;
public ReadSingleKeyOp(VeniceAdapterMetrics veniceAdapterMetrics,
VeniceSpace veniceSpace,
String key) {
Object key) {
super(veniceAdapterMetrics, veniceSpace);
this.client = veniceSpace.getClient();
this.key = key;
@ -57,7 +57,7 @@ public class ReadSingleKeyOp extends VeniceOp {
CompletableFuture<Object> handle = client.get(key);
callValue = handle.join();
if (logger.isDebugEnabled()) {
logger.debug("ReadSingleKeyOp key={} value={}", key, callValue);
logger.debug("ReadSingleKeyOp key={} value={} latency {}", key, callValue);
}
}
if (callValue != null) {

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.ops;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.linkedin.venice.producer.DurableWrite;
import com.linkedin.venice.producer.online.OnlineVeniceProducer;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CompletableFuture;
public class WriteOp extends VeniceOp {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp");
private final OnlineVeniceProducer<Object, Object> producer;
private final Object key;
private final Object value;
private final Timer executeTimer;
public WriteOp(VeniceAdapterMetrics veniceAdapterMetrics,
VeniceSpace veniceSpace,
Object key,
Object value) {
super(veniceAdapterMetrics, veniceSpace);
this.producer = veniceSpace.getProducer();
this.key = key;
this.value = value;
this.executeTimer = veniceAdapterMetrics.getExecuteTimer();
}
@Override
public Object apply(long value) {
Object callValue;
try (Timer.Context time = executeTimer.time();) {
CompletableFuture<DurableWrite> handle = producer.asyncPut(this.key, this.value);
callValue = handle.join();
if (logger.isDebugEnabled()) {
logger.debug("Write key={} value={} res {}", key, callValue, callValue);
}
}
return null;
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.util;
import org.apache.avro.Schema;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
public class AvroUtils {
private static final Logger logger = LogManager.getLogger("AvroUtils");
public static org.apache.avro.Schema parseAvroSchema(String avroSchemDef) {
return new org.apache.avro.Schema.Parser().parse(avroSchemDef);
}
public static Object encodeToAvro(org.apache.avro.Schema schema, String jsonData) {
if (schema.getType() == Schema.Type.STRING) {
return jsonData;
} else if (schema.getType() == Schema.Type.RECORD) {
org.apache.avro.generic.GenericRecord record = null;
try {
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, jsonData);
record = reader.read(null, decoder);
} catch (IOException ioe) {
logger.info("Cannot convert JSON {} to AVRO: ", jsonData, ioe);
throw new RuntimeException(ioe);
}
return record;
} else {
throw new RuntimeException("Unsupported schema + " + schema.getType()+ ", only string and record");
}
}
}

View File

@ -17,10 +17,9 @@
package io.nosqlbench.adapter.venice.util;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.venice.dispensers.VeniceBaseOpDispenser;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -51,7 +50,6 @@ public class VeniceAdapterMetrics {
veniceBaseOpDispenser,"execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.foundCounter =
ActivityMetrics.counter(
veniceBaseOpDispenser,"found");
@ -70,5 +68,4 @@ public class VeniceAdapterMetrics {
public Counter getNotFoundCounter() {
return notFoundCounter;
}
}

View File

@ -0,0 +1,24 @@
#bin/bash
# This is an utility script to create the store in Venice.
# Use ./download.sh in order to download the binaries needed to run this script.
set -x -e
HERE=$(realpath $0)
cd $HERE
jar=binaries/*admin-tool-all*.jar
storeName=$1
url=http://localhost:5555
clusterName=venice-cluster0
keySchema=key.avsc
valueSchema=value.avsc
# create the store
java -jar $jar --new-store --url $url --cluster $clusterName --store $storeName --key-schema-file $keySchema --value-schema-file $valueSchema --hybrid-data-replication-policy NON_AGGREGATE
# enable incremental push and disable read quota
java -jar $jar --update-store --url $url --cluster $clusterName --store $storeName --storage-quota -1 --incremental-push-enabled true --hybrid-data-replication-policy NON_AGGREGATE --read-quota 1000000
# create the first version of the store
java -jar $jar --empty-push --url $url --cluster $clusterName --store $storeName --push-id init --store-size 1000

View File

@ -0,0 +1,18 @@
##########
# This script downloads the binaries needed to run the create-store.sh script.
##########
set -x -e
HERE=$(realpath $(dirname $0))
VENICETOOLSURL=https://github.com/datastax/venice/releases/download/ds-0.4.17-alpha-12/venice-admin-tool-all.jar
BINDIR=$HERE/binaries
rm -Rf $BINDIR
mkdir $BINDIR
pushd $BINDIR
cd $BINDIR
curl -L -O $AVROTOOLSURL
popd

View File

@ -0,0 +1 @@
{"name": "key","type": "string"}

View File

@ -0,0 +1 @@
{"type":"record","name":"Person","namespace":"org.example.WriteKeyValue","fields":[{"name":"age","type":"int"},{"name":"name","type":"string"}]}

View File

@ -9,6 +9,32 @@ Configuration options:
- store_name: the name of the store to use (default: store1)
- token: the token to use for authentication (default: none)
# 2. Sample command
# 2. Sample commands
java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_reader.yaml store_name=store1 router_url=http://localhost:7777 cycles=100 -v
You can run Venice using the Venice Standalone package:
```bash
git clone https://github.com/datastax/venice
cd venice
./gradlew :services:venice-standalone:run
```
Then you create a "store" using the scripts in the `adapter-venice/src/main/resources` directory:
```bash
cd adapter-venice/src/main/resources/scripts
./dowload.sh
./create-store.sh store1
```
Then you can populate the store with some data using NB:
```bash
java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_writer.yaml store_name=store1 router_url=http://localhost:7777 cycles=100000 -v --report-summary-to stdout:60 --report-csv-to reports
```
And you can read the data back using NB as well:
```bash
java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_reader.yaml store_name=store1 router_url=http://localhost:7777 cycles=100000 -v --report-summary-to stdout:60 --report-csv-to reports
```

View File

@ -1,13 +1,10 @@
bindings:
mykey: Mod(5); ToString(); Prefix("key-")
# document level parameters that apply to all Pulsar client types:
params:
temporary_dest: "false"
mykey: Mod(900000); ToString(); Prefix("name")
blocks:
read-block:
ops:
op1:
ReadSingleKey: "store1"
ReadSingleKey: ""
key: "{mykey}"
keySchema: "\"string\""

View File

@ -0,0 +1,12 @@
bindings:
mykey: Mod(900000); ToString(); Prefix("name")
blocks:
wrtie-block:
ops:
op1:
Write: ""
key: "{mykey}"
keySchema: "\"string\""
value: "{\"name\":\"{mykey}\",\"age\":10}"
valueSchema: "{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"org.example.WriteKeyValue\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"

View File

@ -394,6 +394,7 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
if (null != reportGraphiteTo) reporters.addGraphite(reportGraphiteTo, options.wantsMetricsPrefix());
if (null != options.wantsReportCsvTo())
reporters.addCSVReporter(options.wantsReportCsvTo(), options.wantsMetricsPrefix());
reporters.addLogger();
reporters.start(10, options.getReportInterval());
}