remove venice adapter unless or until it is resumed, parked under a separate branch

This commit is contained in:
Jonathan Shook 2023-09-28 14:46:09 -05:00
parent 602fbfb15f
commit 10c8da116a
25 changed files with 2 additions and 1054 deletions

View File

@ -1,151 +0,0 @@
<!--
~ Copyright (c) 2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>adapter-venice</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>${revision}</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<description>
A VeniceDB driver for nosqlbench. This provides the ability to read from VeniceDB.
</description>
<properties>
<venice.version>0.4.17-alpha-12</venice.version>
</properties>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.linkedin.venice</groupId>
<artifactId>venice-client-common</artifactId>
<version>${venice.version}</version>
<exclusions>
<exclusion>
<groupId>org.sonatype.oss</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.helix</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.conscrypt</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.linkedin.venice</groupId>
<artifactId>venice-producer</artifactId>
<version>${venice.version}</version>
<exclusions>
<exclusion>
<groupId>org.sonatype.oss</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.helix</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.conscrypt</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.linkedin.venice</groupId>
<artifactId>venice-thin-client</artifactId>
<version>${venice.version}</version>
<exclusions>
<exclusion>
<groupId>org.sonatype.oss</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.helix</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.conscrypt</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<id>datastax-public</id>
<name>DataStax Public Repository</name>
<layout>default</layout>
<url>https://repo.datastax.com/datastax-public-releases-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>linkedin-oss</id>
<url>https://linkedin.jfrog.io/artifactory/open-source</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</project>

View File

@ -1,52 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice;
import io.nosqlbench.adapter.venice.ops.VeniceOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "venice")
public class VeniceDriverAdapter extends BaseDriverAdapter<VeniceOp, VeniceSpace> {
private final static Logger logger = LogManager.getLogger(VeniceDriverAdapter.class);
@Override
public OpMapper<VeniceOp> getOpMapper() {
DriverSpaceCache<? extends VeniceSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new VeniceOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends VeniceSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new VeniceSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(VeniceSpace.getConfigModel());
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice;
import io.nosqlbench.adapter.venice.dispensers.*;
import io.nosqlbench.adapter.venice.ops.VeniceOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class VeniceOpMapper implements OpMapper<VeniceOp> {
private final static Logger logger = LogManager.getLogger(VeniceOpMapper.class);
private final NBConfiguration cfg;
private final DriverSpaceCache<? extends VeniceSpace> spaceCache;
private final DriverAdapter adapter;
public VeniceOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends VeniceSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends VeniceOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
VeniceSpace veniceSpace = spaceCache.get(spaceName);
/*
* If the user provides a body element, then they want to provide the JSON or
* a data structure that can be converted into JSON, bypassing any further
* specialized type-checking or op-type specific features
*/
if (op.isDefined("body")) {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
}
else {
TypeAndTarget<VeniceOpType, String> opType = op.getTypeAndTarget(VeniceOpType.class, String.class);
return switch (opType.enumId) {
case ReadSingleKey ->
new ReadSingleKeyOpDispenser(adapter, op, veniceSpace);
case Write ->
new WriteOpDispenser(adapter, op, veniceSpace);
};
}
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice;
public enum VeniceOpType {
// read a single key
ReadSingleKey,
Write
}

View File

@ -1,127 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice;
import com.linkedin.venice.authentication.jwt.ClientAuthenticationProviderToken;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.producer.online.OnlineProducerFactory;
import com.linkedin.venice.producer.online.OnlineVeniceProducer;
import com.linkedin.venice.utils.VeniceProperties;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class VeniceSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(VeniceSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
private final String routerUrl;
private final String storeName;
private long veniceActivityStartTimeMills;
private final String token;
private ClientConfig clientConfig;
private AvroGenericStoreClient<Object, Object> client;
private OnlineVeniceProducer<Object, Object> producer;
public VeniceSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
this.routerUrl = cfg.get("router_url");
this.storeName = cfg.get("store_name");
this.token = cfg.get("token");
this.veniceActivityStartTimeMills = System.currentTimeMillis();
this.initializeSpace();
}
@Override
public void close() {
shutdownSpace();
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(VeniceSpace.class)
.add(Param.defaultTo("router_url", "http://localhost:7777")
.setDescription("Venice Router URL."))
.add(Param.defaultTo("store_name", "store1")
.setDescription("Name of the Venice store"))
.add(Param.defaultTo("token", "")
.setDescription("JWT Token Authentication"))
.asReadOnly();
}
public synchronized AvroGenericStoreClient<Object, Object> getClient() {
if (client == null) {
client = ClientFactory.getAndStartGenericAvroClient(clientConfig);
}
return client;
}
public synchronized OnlineVeniceProducer<Object, Object> getProducer() {
if (producer == null) {
VeniceProperties properties = VeniceProperties.empty();
producer = OnlineProducerFactory.createProducer(clientConfig, properties,null);
}
return producer;
}
public void initializeSpace() {
this.clientConfig = ClientConfig.defaultGenericClientConfig(storeName);
clientConfig.setVeniceURL(routerUrl);
clientConfig.setForceClusterDiscoveryAtStartTime(true);
if (token != null && !token.isEmpty()) {
clientConfig.setAuthenticationProvider(ClientAuthenticationProviderToken.TOKEN(token));
}
}
public void shutdownSpace() {
try {
if (client != null) {
client.close();
}
}
catch (Exception e) {
logger.error("Unexpected error when shutting down NB S4J space.", e);
}
try {
if (producer != null) {
producer.close();
}
}
catch (Exception e) {
logger.error("Unexpected error when shutting down NB S4J space.", e);
}
}
public long getVeniceActivityStartTimeMills() {
return veniceActivityStartTimeMills;
}
}

View File

@ -1,58 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.dispensers;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.ops.ReadSingleKeyOp;
import io.nosqlbench.adapter.venice.util.AvroUtils;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOpDispenser");
private final LongFunction<String> keyStrFunc;
private final Schema keySchema;
private static final String KEY_SCHEMA_OP_PARAM = "keySchema";
private static final String KEY_OP_PARAM = "key";
public ReadSingleKeyOpDispenser(DriverAdapter adapter,
ParsedOp op,
VeniceSpace s4jSpace) {
super(adapter, op, s4jSpace);
this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM);
this.keySchema = lookupAvroSchema(KEY_SCHEMA_OP_PARAM);
}
@Override
public ReadSingleKeyOp apply(long cycle) {
String key = keyStrFunc.apply(cycle);
Object encodedKey = AvroUtils.encodeToAvro(keySchema, key);
return new ReadSingleKeyOp(
veniceAdapterMetrics,
veniceSpace,
encodedKey);
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.dispensers;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.ops.VeniceOp;
import io.nosqlbench.adapter.venice.util.*;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public abstract class VeniceBaseOpDispenser extends BaseOpDispenser<VeniceOp, VeniceSpace> {
private static final Logger logger = LogManager.getLogger("VeniceBaseOpDispenser");
protected final ParsedOp parsedOp;
protected final VeniceSpace veniceSpace;
protected final VeniceAdapterMetrics veniceAdapterMetrics;
protected VeniceBaseOpDispenser(DriverAdapter adapter,
ParsedOp op,
VeniceSpace veniceSpace) {
super(adapter, op);
this.parsedOp = op;
this.veniceSpace = veniceSpace;
this.veniceAdapterMetrics = new VeniceAdapterMetrics(this);
veniceAdapterMetrics.initVeniceAdapterInstrumentation();
}
public VeniceSpace getVeniceSpace() { return veniceSpace; }
public VeniceAdapterMetrics getVeniceAdapterMetrics() { return veniceAdapterMetrics; }
// Mandatory Op parameter. Throw an error if not specified or having empty value
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
protected Schema lookupAvroSchema(String paramName) {
String schema = parsedOp.getStaticValueOr(paramName, "");
try {
if (schema.isEmpty()) {
schema = Schema.Type.STRING.getName();
logger.info("{}: {} (default)", paramName, schema);
} else {
logger.info("{}: {}", paramName, schema);
}
return AvroUtils.parseAvroSchema(schema);
} catch (Exception err) {
throw new IllegalArgumentException("Cannot parse avro schema "+schema);
}
}
}

View File

@ -1,68 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.dispensers;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.ops.WriteOp;
import io.nosqlbench.adapter.venice.util.AvroUtils;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class WriteOpDispenser extends VeniceBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOpDispenser");
private final LongFunction<String> keyStrFunc;
private final LongFunction<String> valueStrFunc;
private final Schema keySchema;
private final Schema valueSchema;
private static final String KEY_OP_PARAM = "key";
private static final String VALUE_OP_PARAM = "value";
private static final String VALUE_SCHEMA_OP_PARAM = "valueSchema";
private static final String KEY_SCHEMA_OP_PARAM = "keySchema";
public WriteOpDispenser(DriverAdapter adapter,
ParsedOp op,
VeniceSpace s4jSpace) {
super(adapter, op, s4jSpace);
this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM);
this.keySchema = lookupAvroSchema(KEY_SCHEMA_OP_PARAM);
this.valueStrFunc = lookupMandtoryStrOpValueFunc(VALUE_OP_PARAM);
this.valueSchema = lookupAvroSchema(VALUE_SCHEMA_OP_PARAM);
}
@Override
public WriteOp apply(long cycle) {
String key = keyStrFunc.apply(cycle);
String value = valueStrFunc.apply(cycle);
Object encodedKey = AvroUtils.encodeToAvro(keySchema, key);
Object encodedValue = AvroUtils.encodeToAvro(valueSchema, value);
return new WriteOp(
veniceAdapterMetrics,
veniceSpace,
encodedKey,
encodedValue);
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CompletableFuture;
public class ReadSingleKeyOp extends VeniceOp {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp");
private final AvroGenericStoreClient<Object, Object> client;
private final Object key;
private final Timer executeTimer;
private Counter foundCounter;
private Counter notFoundCounter;
public ReadSingleKeyOp(VeniceAdapterMetrics veniceAdapterMetrics,
VeniceSpace veniceSpace,
Object key) {
super(veniceAdapterMetrics, veniceSpace);
this.client = veniceSpace.getClient();
this.key = key;
this.executeTimer = veniceAdapterMetrics.getExecuteTimer();
this.foundCounter = veniceAdapterMetrics.getFoundCounter();
this.notFoundCounter = veniceAdapterMetrics.getNotFoundCounter();
}
@Override
public Object apply(long value) {
Object callValue;
try (Timer.Context time = executeTimer.time();) {
CompletableFuture<Object> handle = client.get(key);
callValue = handle.join();
if (logger.isDebugEnabled()) {
logger.debug("ReadSingleKeyOp key={} value={} latency {}", key, callValue);
}
}
if (callValue != null) {
foundCounter.inc();
} else {
notFoundCounter.inc();
}
return null;
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.ops;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
public abstract class VeniceOp implements CycleOp<Object> {
protected VeniceAdapterMetrics veniceAdapterMetrics;
protected final VeniceSpace veniceSpace;
protected final long veniceOpStartTimeMills;
public VeniceOp(
VeniceAdapterMetrics veniceAdapterMetrics,
VeniceSpace veniceSpace)
{
this.veniceAdapterMetrics = veniceAdapterMetrics;
this.veniceSpace = veniceSpace;
this.veniceOpStartTimeMills = veniceSpace.getVeniceActivityStartTimeMills();
}
}

View File

@ -1,65 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.ops;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.linkedin.venice.producer.DurableWrite;
import com.linkedin.venice.producer.online.OnlineVeniceProducer;
import io.nosqlbench.adapter.venice.VeniceSpace;
import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CompletableFuture;
public class WriteOp extends VeniceOp {
private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp");
private final OnlineVeniceProducer<Object, Object> producer;
private final Object key;
private final Object value;
private final Timer executeTimer;
public WriteOp(VeniceAdapterMetrics veniceAdapterMetrics,
VeniceSpace veniceSpace,
Object key,
Object value) {
super(veniceAdapterMetrics, veniceSpace);
this.producer = veniceSpace.getProducer();
this.key = key;
this.value = value;
this.executeTimer = veniceAdapterMetrics.getExecuteTimer();
}
@Override
public Object apply(long value) {
Object callValue;
try (Timer.Context time = executeTimer.time();) {
CompletableFuture<DurableWrite> handle = producer.asyncPut(this.key, this.value);
callValue = handle.join();
if (logger.isDebugEnabled()) {
logger.debug("Write key={} value={} res {}", key, callValue, callValue);
}
}
return null;
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.util;
import org.apache.avro.Schema;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
public class AvroUtils {
private static final Logger logger = LogManager.getLogger("AvroUtils");
public static org.apache.avro.Schema parseAvroSchema(String avroSchemDef) {
return new org.apache.avro.Schema.Parser().parse(avroSchemDef);
}
public static Object encodeToAvro(org.apache.avro.Schema schema, String jsonData) {
if (schema.getType() == Schema.Type.STRING) {
return jsonData;
} else if (schema.getType() == Schema.Type.RECORD) {
org.apache.avro.generic.GenericRecord record = null;
try {
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, jsonData);
record = reader.read(null, decoder);
} catch (IOException ioe) {
logger.info("Cannot convert JSON {} to AVRO: ", jsonData, ioe);
throw new RuntimeException(ioe);
}
return record;
} else {
throw new RuntimeException("Unsupported schema + " + schema.getType()+ ", only string and record");
}
}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.venice.util;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.venice.dispensers.VeniceBaseOpDispenser;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class VeniceAdapterMetrics {
private static final Logger logger = LogManager.getLogger("VeniceAdapterMetrics");
private Timer executeTimer;
private Counter foundCounter;
private Counter notFoundCounter;
private final VeniceBaseOpDispenser veniceBaseOpDispenser;
public VeniceAdapterMetrics(VeniceBaseOpDispenser veniceBaseOpDispenser) {
this.veniceBaseOpDispenser = veniceBaseOpDispenser;
}
public String getName() {
return "VeniceAdapterMetrics";
}
public void initVeniceAdapterInstrumentation() {
this.executeTimer =
ActivityMetrics.timer(
veniceBaseOpDispenser,"execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.foundCounter =
ActivityMetrics.counter(
veniceBaseOpDispenser,"found");
this.notFoundCounter =
ActivityMetrics.counter(
veniceBaseOpDispenser, "notFound");
}
public Timer getExecuteTimer() { return executeTimer; }
public Counter getFoundCounter() {
return foundCounter;
}
public Counter getNotFoundCounter() {
return notFoundCounter;
}
}

View File

@ -1,23 +0,0 @@
#bin/bash
# This is an utility script to create the store in Venice.
# Use ./download.sh in order to download the binaries needed to run this script.
set -x -e
HERE=$(dirname $0)
# move to the directory with the Schema files
cd $HERE
jar=../../../../target/venice-admin-tool-all.jar
storeName=$1
url=http://localhost:5555
clusterName=venice-cluster0
keySchema=key.avsc
valueSchema=value.avsc
# create the store
java -jar $jar --new-store --url $url --cluster $clusterName --store $storeName --key-schema-file $keySchema --value-schema-file $valueSchema --hybrid-data-replication-policy NON_AGGREGATE
# enable incremental push, disable read quota and set NON_AGGREGATE hybrid-data-replication-policy
java -jar $jar --update-store --url $url --cluster $clusterName --store $storeName --storage-quota -1 --incremental-push-enabled true --hybrid-data-replication-policy NON_AGGREGATE --read-quota 1000000 --hybrid-rewind-seconds 86400 --hybrid-offset-lag 1000
# create the first version of the store
java -jar $jar --empty-push --url $url --cluster $clusterName --store $storeName --push-id init --store-size 1000

View File

@ -1,18 +0,0 @@
##########
# This script downloads the binaries needed to run the create-store.sh script.
##########
set -x -e
HERE=$(realpath $(dirname $0))
VENICETOOLSURL=https://github.com/datastax/venice/releases/download/ds-0.4.17-alpha-12/venice-admin-tool-all.jar
BINDIR=$HERE/../../../../target
rm -Rf $BINDIR
mkdir $BINDIR
pushd $BINDIR
cd $BINDIR
curl -L -O $VENICETOOLSURL
popd

View File

@ -1 +0,0 @@
{"name": "key","type": "string"}

View File

@ -1 +0,0 @@
{"type":"record","name":"Person","namespace":"org.example.WriteKeyValue","fields":[{"name":"age","type":"int"},{"name":"name","type":"string"}]}

View File

@ -1,45 +0,0 @@
---
weight: 0
title: VeniceDB
---
# 1. Overview
Configuration options:
- router_url: the address of the Venice Router service (default: http://localhost:7777)
- store_name: the name of the store to use (default: store1)
- token: the token to use for authentication (default: none)
# 2. Sample commands
You can run Venice using the Venice Standalone package:
```bash
git clone https://github.com/datastax/venice
cd venice
./gradlew :services:venice-standalone:run
```
Then you create a "store" using the scripts in the `adapter-venice/src/main/resources` directory:
```bash
cd adapter-venice/src/main/resources/scripts
./dowload.sh
./create-store.sh store1
```
The script creates a Venice store with the given Key and Value schemas defined in the key.avsc and value.avsc files.
Please ensure that you set the same schemas on your workload configuration files (keySchema and valueSchema).
Then you can populate the store with some data using NB.
Open a new terminal and run:
```bash
java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_writer.yaml store_name=store1 router_url=http://localhost:7777 cycles=100000 -v --report-summary-to stdout:60 --report-csv-to reports
```
And you can read the data back using NB as well:
```bash
java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_reader.yaml store_name=store1 router_url=http://localhost:7777 cycles=100000 -v --report-summary-to stdout:60 --report-csv-to reports
```

View File

@ -1,3 +0,0 @@
router_url=http://localhost:7777
store_name=store1
token=

View File

@ -1,10 +0,0 @@
bindings:
mykey: Mod(900000); ToString(); Prefix("name")
blocks:
read-block:
ops:
op1:
ReadSingleKey: ""
key: "{mykey}"
keySchema: "\"string\""

View File

@ -1,12 +0,0 @@
bindings:
mykey: Mod(900000); ToString(); Prefix("name")
blocks:
wrtie-block:
ops:
op1:
Write: ""
key: "{mykey}"
keySchema: "\"string\""
value: "{\"name\":\"{mykey}\",\"age\":10}"
valueSchema: "{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"org.example.WriteKeyValue\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"

View File

@ -35,6 +35,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<javadoc.name>nosqlbench</javadoc.name>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>

View File

@ -95,12 +95,6 @@
<version>${revision}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.nosqlbench</groupId>-->
<!-- <artifactId>adapter-venice</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-s4j</artifactId>

View File

@ -60,7 +60,6 @@
<module.adapter-tcp>adapter-tcp</module.adapter-tcp>
<module.adapter-dynamodb>adapter-dynamodb</module.adapter-dynamodb>
<module.adapter-mongodb>adapter-mongodb</module.adapter-mongodb>
<!-- <module.adapter-venice>adapter-venice</module.adapter-venice>-->
<module.adapter-pulsar>adapter-pulsar</module.adapter-pulsar>
<module.adapter-s4j>adapter-s4j</module.adapter-s4j>
<module.adapter-kafka>adapter-kafka</module.adapter-kafka>
@ -101,7 +100,6 @@
<module>adapters-api</module>
<!-- driver modules -->
<!-- <module>adapter-venice</module>-->
<module>adapter-diag</module>
<module>adapter-stdout</module>
<module>adapter-cqld4</module>
@ -176,7 +174,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.4.2</version>
<version>3.4.3</version>
</plugin>
</plugins>
</reporting>