From 298eaa8945ef7630a0ca37bb8059363a4530e0e6 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 16 May 2023 12:58:03 +0200 Subject: [PATCH 1/6] [adapter-venice] Introduce new VeniceDB adapter --- adapter-venice/pom.xml | 127 ++++++++++++++++++ .../adapter/venice/VeniceDriverAdapter.java | 52 +++++++ .../adapter/venice/VeniceOpMapper.java | 68 ++++++++++ .../adapter/venice/VeniceOpType.java | 24 ++++ .../adapter/venice/VeniceSpace.java | 99 ++++++++++++++ .../dispensers/ReadSingleKeyOpDispenser.java | 56 ++++++++ .../dispensers/VeniceBaseOpDispenser.java | 127 ++++++++++++++++++ .../adapter/venice/ops/ReadSingleKeyOp.java | 70 ++++++++++ .../adapter/venice/ops/VeniceOp.java | 40 ++++++ .../venice/util/VeniceAdapterMetrics.java | 80 +++++++++++ adapter-venice/src/main/resources/venice.md | 14 ++ .../main/resources/venice_config.properties | 3 + .../src/main/resources/venice_reader.yaml | 13 ++ .../io/nosqlbench/api/config/MapLabels.java | 2 +- nb5/pom.xml | 6 + pom.xml | 2 + 16 files changed, 782 insertions(+), 1 deletion(-) create mode 100644 adapter-venice/pom.xml create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceDriverAdapter.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/VeniceOp.java create mode 100644 adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java create mode 100644 adapter-venice/src/main/resources/venice.md create mode 100644 adapter-venice/src/main/resources/venice_config.properties create mode 100644 adapter-venice/src/main/resources/venice_reader.yaml diff --git a/adapter-venice/pom.xml b/adapter-venice/pom.xml new file mode 100644 index 000000000..5caa33bec --- /dev/null +++ b/adapter-venice/pom.xml @@ -0,0 +1,127 @@ + + + + 4.0.0 + + adapter-venice + jar + + + mvn-defaults + io.nosqlbench + ${revision} + ../mvn-defaults + + + ${project.artifactId} + + A VeniceDB driver for nosqlbench. This provides the ability to read from VeniceDB. + + + + 0.4.17-alpha-8 + + + + + + io.nosqlbench + engine-api + ${revision} + + + + io.nosqlbench + adapters-api + ${revision} + + + + com.linkedin.venice + venice-client-common + ${venice.version} + + + org.sonatype.oss + * + + + org.apache.helix + * + + + org.apache.logging.log4j + * + + + org.conscrypt + * + + + + + + com.linkedin.venice + venice-thin-client + ${venice.version} + + + org.sonatype.oss + * + + + org.apache.helix + * + + + org.apache.logging.log4j + * + + + org.conscrypt + * + + + + + + + + + datastax-public + DataStax Public Repository + default + https://repo.datastax.com/datastax-public-releases-local + + false + + + true + + + + linkedin-oss + https://linkedin.jfrog.io/artifactory/open-source + + false + + + true + + + + diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceDriverAdapter.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceDriverAdapter.java new file mode 100644 index 000000000..09b8e7df5 --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceDriverAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice; + +import io.nosqlbench.adapter.venice.ops.VeniceOp; +import io.nosqlbench.api.config.standard.NBConfigModel; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.nb.annotations.Service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Function; + +@Service(value = DriverAdapter.class, selector = "venice") +public class VeniceDriverAdapter extends BaseDriverAdapter { + private final static Logger logger = LogManager.getLogger(VeniceDriverAdapter.class); + + @Override + public OpMapper getOpMapper() { + DriverSpaceCache spaceCache = getSpaceCache(); + NBConfiguration adapterConfig = getConfiguration(); + return new VeniceOpMapper(this, adapterConfig, spaceCache); + } + + @Override + public Function getSpaceInitializer(NBConfiguration cfg) { + return (s) -> new VeniceSpace(s, cfg); + } + + @Override + public NBConfigModel getConfigModel() { + return super.getConfigModel().add(VeniceSpace.getConfigModel()); + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java new file mode 100644 index 000000000..1dd9094e1 --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpMapper.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice; + +import io.nosqlbench.adapter.venice.dispensers.*; +import io.nosqlbench.adapter.venice.ops.VeniceOp; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.engine.api.activityimpl.OpDispenser; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.engine.api.templating.ParsedOp; +import io.nosqlbench.engine.api.templating.TypeAndTarget; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class VeniceOpMapper implements OpMapper { + + private final static Logger logger = LogManager.getLogger(VeniceOpMapper.class); + + private final NBConfiguration cfg; + private final DriverSpaceCache spaceCache; + private final DriverAdapter adapter; + + public VeniceOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache spaceCache) { + this.cfg = cfg; + this.spaceCache = spaceCache; + this.adapter = adapter; + } + + @Override + public OpDispenser apply(ParsedOp op) { + String spaceName = op.getStaticConfigOr("space", "default"); + VeniceSpace veniceSpace = spaceCache.get(spaceName); + + /* + * If the user provides a body element, then they want to provide the JSON or + * a data structure that can be converted into JSON, bypassing any further + * specialized type-checking or op-type specific features + */ + if (op.isDefined("body")) { + throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field."); + } + else { + TypeAndTarget opType = op.getTypeAndTarget(VeniceOpType.class, String.class); + + return switch (opType.enumId) { + case ReadSingleKey -> + new ReadSingleKeyOpDispenser(adapter, op, veniceSpace); + }; + } + } + +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java new file mode 100644 index 000000000..18285b97b --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceOpType.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice; + +public enum VeniceOpType { + // read a single key + ReadSingleKey +} + + diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java new file mode 100644 index 000000000..e288c68af --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice; + +import com.linkedin.venice.client.store.AvroGenericStoreClient; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.ClientFactory; +import io.nosqlbench.api.config.standard.ConfigModel; +import io.nosqlbench.api.config.standard.NBConfigModel; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.api.config.standard.Param; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class VeniceSpace implements AutoCloseable { + + private final static Logger logger = LogManager.getLogger(VeniceSpace.class); + + private final String spaceName; + private final NBConfiguration cfg; + + private final String routerUrl; + private final String storeName; + + private long veniceActivityStartTimeMills; + private final String token; + + private AvroGenericStoreClient client; + + + public VeniceSpace(String spaceName, NBConfiguration cfg) { + this.spaceName = spaceName; + this.cfg = cfg; + + this.routerUrl = cfg.get("router_url"); + this.storeName = cfg.get("store_name"); + this.token = cfg.get("token"); + + this.veniceActivityStartTimeMills = System.currentTimeMillis(); + this.initializeSpace(); + } + + @Override + public void close() { + shutdownSpace(); + } + + public static NBConfigModel getConfigModel() { + return ConfigModel.of(VeniceSpace.class) + .add(Param.defaultTo("router_url", "http://localhost:7777") + .setDescription("Venice Router URL.")) + .add(Param.defaultTo("store_name", "store1") + .setDescription("Name of the Venice store")) + .add(Param.defaultTo("token", "") + .setDescription("JWT Token Authentication")) + .asReadOnly(); + } + + public AvroGenericStoreClient getClient() { + return client; + } + + public void initializeSpace() { + ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(storeName); + clientConfig.setVeniceURL(routerUrl); + clientConfig.setForceClusterDiscoveryAtStartTime(true); + // clientConfig.setToken(token); + client = ClientFactory.getAndStartGenericAvroClient(clientConfig); + } + + public void shutdownSpace() { + try { + client.close(); + } + catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("Unexpected error when shutting down NB S4J space."); + } + } + + public long getVeniceActivityStartTimeMills() { + return veniceActivityStartTimeMills; + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java new file mode 100644 index 000000000..77daf0429 --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice.dispensers; + +import io.nosqlbench.adapter.venice.VeniceSpace; +import io.nosqlbench.adapter.venice.ops.ReadSingleKeyOp; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongFunction; +public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser { + + private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser"); + private final LongFunction keyStrFunc; + + private static final String KEY_OP_PARAM = "key"; + + public ReadSingleKeyOpDispenser(DriverAdapter adapter, + ParsedOp op, + VeniceSpace s4jSpace) { + super(adapter, op, s4jSpace); + this.keyStrFunc = lookupMandtoryStrOpValueFunc(KEY_OP_PARAM); + } + + @Override + public ReadSingleKeyOp apply(long cycle) { + String key = keyStrFunc.apply(cycle); + return new ReadSingleKeyOp( + veniceAdapterMetrics, + veniceSpace, + key); + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java new file mode 100644 index 000000000..ac5b99a26 --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice.dispensers; + +import io.nosqlbench.adapter.venice.VeniceSpace; +import io.nosqlbench.adapter.venice.ops.VeniceOp; +import io.nosqlbench.adapter.venice.util.*; +import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.*; +import java.util.function.LongFunction; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public abstract class VeniceBaseOpDispenser extends BaseOpDispenser { + + private static final Logger logger = LogManager.getLogger("VeniceBaseOpDispenser"); + + protected final ParsedOp parsedOp; + protected final VeniceSpace veniceSpace; + protected final VeniceAdapterMetrics veniceAdapterMetrics; + + protected VeniceBaseOpDispenser(DriverAdapter adapter, + ParsedOp op, + VeniceSpace veniceSpace) { + + super(adapter, op); + + this.parsedOp = op; + this.veniceSpace = veniceSpace; + String defaultMetricsPrefix = parsedOp.getLabels().linearize("activity"); + this.veniceAdapterMetrics = new VeniceAdapterMetrics(defaultMetricsPrefix); + veniceAdapterMetrics.initVeniceAdapterInstrumentation(); + } + + public VeniceSpace getVeniceSpace() { return veniceSpace; } + public VeniceAdapterMetrics getVeniceAdapterMetrics() { return veniceAdapterMetrics; } + + protected LongFunction lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) { + LongFunction booleanLongFunction; + booleanLongFunction = l -> parsedOp.getOptionalStaticConfig(paramName, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> BooleanUtils.toBoolean(value)) + .orElse(defaultValue); + logger.info("{}: {}", paramName, booleanLongFunction.apply(0)); + return booleanLongFunction; + } + + protected LongFunction> lookupStaticStrSetOpValueFunc(String paramName) { + LongFunction> setStringLongFunction; + setStringLongFunction = l -> parsedOp.getOptionalStaticValue(paramName, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> { + Set set = new HashSet<>(); + + if (StringUtils.contains(value,',')) { + set = Arrays.stream(value.split(",")) + .map(String::trim) + .filter(Predicate.not(String::isEmpty)) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + return set; + }).orElse(Collections.emptySet()); + logger.info("{}: {}", paramName, setStringLongFunction.apply(0)); + return setStringLongFunction; + } + + // If the corresponding Op parameter is not provided, use the specified default value + protected LongFunction lookupStaticIntOpValueFunc(String paramName, int defaultValue) { + LongFunction integerLongFunction; + integerLongFunction = l -> parsedOp.getOptionalStaticValue(paramName, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> NumberUtils.toInt(value)) + .map(value -> { + if (0 > value) return 0; + return value; + }).orElse(defaultValue); + logger.info("{}: {}", paramName, integerLongFunction.apply(0)); + return integerLongFunction; + } + + // If the corresponding Op parameter is not provided, use the specified default value + protected LongFunction lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { + LongFunction stringLongFunction; + stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class) + .orElse(l -> defaultValue); + logger.info("{}: {}", paramName, stringLongFunction.apply(0)); + + return stringLongFunction; + } + protected LongFunction lookupOptionalStrOpValueFunc(String paramName) { + return lookupOptionalStrOpValueFunc(paramName, ""); + } + + // Mandatory Op parameter. Throw an error if not specified or having empty value + protected LongFunction lookupMandtoryStrOpValueFunc(String paramName) { + LongFunction stringLongFunction; + stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class); + logger.info("{}: {}", paramName, stringLongFunction.apply(0)); + + return stringLongFunction; + } + + +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java new file mode 100644 index 000000000..8082098fb --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/ReadSingleKeyOp.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice.ops; + + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.linkedin.venice.client.store.AvroGenericStoreClient; +import io.nosqlbench.adapter.venice.VeniceSpace; +import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; + + +public class ReadSingleKeyOp extends VeniceOp { + + private final static Logger logger = LogManager.getLogger("ReadSingleKeyOp"); + + private final AvroGenericStoreClient client; + private final String key; + private final Timer executeTimer; + + private Counter foundCounter; + private Counter notFoundCounter; + + public ReadSingleKeyOp(VeniceAdapterMetrics veniceAdapterMetrics, + VeniceSpace veniceSpace, + String key) { + super(veniceAdapterMetrics, veniceSpace); + this.client = veniceSpace.getClient(); + this.key = key; + this.executeTimer = veniceAdapterMetrics.getExecuteTimer(); + this.foundCounter = veniceAdapterMetrics.getFoundCounter(); + this.notFoundCounter = veniceAdapterMetrics.getNotFoundCounter(); + } + + @Override + public Object apply(long value) { + Object callValue; + try (Timer.Context time = executeTimer.time();) { + CompletableFuture handle = client.get(key); + callValue = handle.join(); + if (logger.isDebugEnabled()) { + logger.debug("ReadSingleKeyOp key={} value={}", key, callValue); + } + } + if (callValue != null) { + foundCounter.inc(); + } else { + notFoundCounter.inc(); + } + return null; + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/VeniceOp.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/VeniceOp.java new file mode 100644 index 000000000..e3a637fdd --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/ops/VeniceOp.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.venice.ops; + +import io.nosqlbench.adapter.venice.VeniceSpace; +import io.nosqlbench.adapter.venice.util.VeniceAdapterMetrics; +import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp; + + + +public abstract class VeniceOp implements CycleOp { + protected VeniceAdapterMetrics veniceAdapterMetrics; + protected final VeniceSpace veniceSpace; + protected final long veniceOpStartTimeMills; + + + public VeniceOp( + VeniceAdapterMetrics veniceAdapterMetrics, + VeniceSpace veniceSpace) + { + this.veniceAdapterMetrics = veniceAdapterMetrics; + this.veniceSpace = veniceSpace; + this.veniceOpStartTimeMills = veniceSpace.getVeniceActivityStartTimeMills(); + } +} diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java new file mode 100644 index 000000000..a45233fdb --- /dev/null +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.venice.util; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import io.nosqlbench.api.config.NBLabeledElement; +import io.nosqlbench.api.config.NBLabels; +import io.nosqlbench.api.engine.metrics.ActivityMetrics; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class VeniceAdapterMetrics implements NBLabeledElement { + + private static final Logger logger = LogManager.getLogger("VeniceAdapterMetrics"); + + private final String defaultAdapterMetricsPrefix; + + private Timer executeTimer; + + private Counter foundCounter; + private Counter notFoundCounter; + + public VeniceAdapterMetrics(String defaultMetricsPrefix) { + this.defaultAdapterMetricsPrefix = defaultMetricsPrefix; + } + + public String getName() { + return "VeniceAdapterMetrics"; + } + + public void initVeniceAdapterInstrumentation() { + + this.executeTimer = + ActivityMetrics.timer( + this, + defaultAdapterMetricsPrefix + "execute", + ActivityMetrics.DEFAULT_HDRDIGITS); + + + this.foundCounter = + ActivityMetrics.counter( + this, + defaultAdapterMetricsPrefix + "found"); + + this.notFoundCounter = + ActivityMetrics.counter( + this, + defaultAdapterMetricsPrefix + "notFound"); + } + + public Timer getExecuteTimer() { return executeTimer; } + + public Counter getFoundCounter() { + return foundCounter; + } + + public Counter getNotFoundCounter() { + return notFoundCounter; + } + + @Override + public NBLabels getLabels() { + return NBLabels.forKV(); + } +} diff --git a/adapter-venice/src/main/resources/venice.md b/adapter-venice/src/main/resources/venice.md new file mode 100644 index 000000000..c8d9c0aa0 --- /dev/null +++ b/adapter-venice/src/main/resources/venice.md @@ -0,0 +1,14 @@ +--- +weight: 0 +title: VeniceDB +--- +# 1. Overview + +Configuration options: +- router_url: the address of the Venice Router service (default: http://localhost:7777) +- store_name: the name of the store to use (default: store1) +- token: the token to use for authentication (default: none) + +# 2. Sample command + +java -jar nb5/target/nb5.jar run driver=venice workload=adapter-venice/src/main/resources/venice_reader.yaml store_name=store1 router_url=http://localhost:7777 cycles=100 -v diff --git a/adapter-venice/src/main/resources/venice_config.properties b/adapter-venice/src/main/resources/venice_config.properties new file mode 100644 index 000000000..495b5e08d --- /dev/null +++ b/adapter-venice/src/main/resources/venice_config.properties @@ -0,0 +1,3 @@ +router_url=http://localhost:7777 +store_name=store1 +token= diff --git a/adapter-venice/src/main/resources/venice_reader.yaml b/adapter-venice/src/main/resources/venice_reader.yaml new file mode 100644 index 000000000..cfdaa8b89 --- /dev/null +++ b/adapter-venice/src/main/resources/venice_reader.yaml @@ -0,0 +1,13 @@ +bindings: + mykey: Mod(5); ToString(); Prefix("key-") + +# document level parameters that apply to all Pulsar client types: +params: + temporary_dest: "false" + +blocks: + read-block: + ops: + op1: + ReadSingleKey: "store1" + key: "{mykey}" diff --git a/nb-api/src/main/java/io/nosqlbench/api/config/MapLabels.java b/nb-api/src/main/java/io/nosqlbench/api/config/MapLabels.java index b28a9e910..959d6ced6 100644 --- a/nb-api/src/main/java/io/nosqlbench/api/config/MapLabels.java +++ b/nb-api/src/main/java/io/nosqlbench/api/config/MapLabels.java @@ -31,7 +31,7 @@ public class MapLabels implements NBLabels { parentLabels.forEach(combined::put); childLabels.forEach((k,v) -> { if (combined.containsKey(k)) - throw new RuntimeException("Can't overlap label keys between parent and child elements. parent:" + parentLabels + ", child:" + childLabels); + throw new RuntimeException("Can't overlap label keys (for instance " + k + ") between parent and child elements. parent:" + parentLabels + ", child:" + childLabels); combined.put(k,v); }); labels=Collections.unmodifiableMap(combined); diff --git a/nb5/pom.xml b/nb5/pom.xml index 304970a8a..011378bc0 100644 --- a/nb5/pom.xml +++ b/nb5/pom.xml @@ -95,6 +95,12 @@ ${revision} + + io.nosqlbench + adapter-venice + ${revision} + + io.nosqlbench adapter-s4j diff --git a/pom.xml b/pom.xml index a2fc2663d..732700505 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ adapter-tcp adapter-dynamodb adapter-mongodb + adapter-venice adapter-pulsar adapter-s4j adapter-kafka @@ -98,6 +99,7 @@ adapters-api + adapter-venice adapter-diag adapter-stdout adapter-cqld4 From 3bc4a21a2dcce8307f6992c798d422a65f931c87 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 May 2023 16:29:21 +0200 Subject: [PATCH 2/6] Update VeniceDB client in order to support token auth --- adapter-venice/pom.xml | 2 +- .../src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/adapter-venice/pom.xml b/adapter-venice/pom.xml index 5caa33bec..fc893ecf1 100644 --- a/adapter-venice/pom.xml +++ b/adapter-venice/pom.xml @@ -33,7 +33,7 @@ - 0.4.17-alpha-8 + 0.4.17-alpha-9 diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java index e288c68af..b624d390c 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java @@ -79,7 +79,7 @@ public class VeniceSpace implements AutoCloseable { ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(storeName); clientConfig.setVeniceURL(routerUrl); clientConfig.setForceClusterDiscoveryAtStartTime(true); - // clientConfig.setToken(token); + clientConfig.setToken(token); client = ClientFactory.getAndStartGenericAvroClient(clientConfig); } From 2ef20d0a0f743a6fbee0ae5d12de6e7642086e8d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 May 2023 16:45:49 +0200 Subject: [PATCH 3/6] Update Metrics --- .../venice/util/VeniceAdapterMetrics.java | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java index a45233fdb..45760314e 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/util/VeniceAdapterMetrics.java @@ -18,25 +18,26 @@ package io.nosqlbench.adapter.venice.util; import com.codahale.metrics.Counter; import com.codahale.metrics.Timer; +import io.nosqlbench.adapter.venice.dispensers.VeniceBaseOpDispenser; import io.nosqlbench.api.config.NBLabeledElement; import io.nosqlbench.api.config.NBLabels; import io.nosqlbench.api.engine.metrics.ActivityMetrics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class VeniceAdapterMetrics implements NBLabeledElement { +public class VeniceAdapterMetrics { private static final Logger logger = LogManager.getLogger("VeniceAdapterMetrics"); - private final String defaultAdapterMetricsPrefix; - private Timer executeTimer; private Counter foundCounter; private Counter notFoundCounter; - public VeniceAdapterMetrics(String defaultMetricsPrefix) { - this.defaultAdapterMetricsPrefix = defaultMetricsPrefix; + private final VeniceBaseOpDispenser veniceBaseOpDispenser; + + public VeniceAdapterMetrics(VeniceBaseOpDispenser veniceBaseOpDispenser) { + this.veniceBaseOpDispenser = veniceBaseOpDispenser; } public String getName() { @@ -47,20 +48,17 @@ public class VeniceAdapterMetrics implements NBLabeledElement { this.executeTimer = ActivityMetrics.timer( - this, - defaultAdapterMetricsPrefix + "execute", + veniceBaseOpDispenser,"execute", ActivityMetrics.DEFAULT_HDRDIGITS); this.foundCounter = ActivityMetrics.counter( - this, - defaultAdapterMetricsPrefix + "found"); + veniceBaseOpDispenser,"found"); this.notFoundCounter = ActivityMetrics.counter( - this, - defaultAdapterMetricsPrefix + "notFound"); + veniceBaseOpDispenser, "notFound"); } public Timer getExecuteTimer() { return executeTimer; } @@ -73,8 +71,4 @@ public class VeniceAdapterMetrics implements NBLabeledElement { return notFoundCounter; } - @Override - public NBLabels getLabels() { - return NBLabels.forKV(); - } } From 82e899c8285124294eb30b9ec248a8a3b4671c32 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 May 2023 16:47:17 +0200 Subject: [PATCH 4/6] Fix metrics --- .../adapter/venice/dispensers/VeniceBaseOpDispenser.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java index ac5b99a26..fa04e9861 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java @@ -49,8 +49,7 @@ public abstract class VeniceBaseOpDispenser extends BaseOpDispenser Date: Wed, 17 May 2023 16:52:34 +0200 Subject: [PATCH 5/6] Do not throw errors on shutdown --- .../main/java/io/nosqlbench/adapter/venice/VeniceSpace.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java index b624d390c..ca227105c 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/VeniceSpace.java @@ -88,8 +88,7 @@ public class VeniceSpace implements AutoCloseable { client.close(); } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException("Unexpected error when shutting down NB S4J space."); + logger.error("Unexpected error when shutting down NB S4J space.", e); } } From 8dcc4c8f784597143af17ee58f70797a084e154c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 May 2023 17:27:34 +0200 Subject: [PATCH 6/6] Address some review comments --- .../dispensers/ReadSingleKeyOpDispenser.java | 2 +- .../dispensers/VeniceBaseOpDispenser.java | 57 ------------------- 2 files changed, 1 insertion(+), 58 deletions(-) diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java index 77daf0429..7c4727711 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/ReadSingleKeyOpDispenser.java @@ -33,7 +33,7 @@ import java.util.Map; import java.util.function.LongFunction; public class ReadSingleKeyOpDispenser extends VeniceBaseOpDispenser { - private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser"); + private final static Logger logger = LogManager.getLogger("ReadSingleKeyOpDispenser"); private final LongFunction keyStrFunc; private static final String KEY_OP_PARAM = "key"; diff --git a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java index fa04e9861..05c600c48 100644 --- a/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java +++ b/adapter-venice/src/main/java/io/nosqlbench/adapter/venice/dispensers/VeniceBaseOpDispenser.java @@ -56,63 +56,6 @@ public abstract class VeniceBaseOpDispenser extends BaseOpDispenser lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) { - LongFunction booleanLongFunction; - booleanLongFunction = l -> parsedOp.getOptionalStaticConfig(paramName, String.class) - .filter(Predicate.not(String::isEmpty)) - .map(value -> BooleanUtils.toBoolean(value)) - .orElse(defaultValue); - logger.info("{}: {}", paramName, booleanLongFunction.apply(0)); - return booleanLongFunction; - } - - protected LongFunction> lookupStaticStrSetOpValueFunc(String paramName) { - LongFunction> setStringLongFunction; - setStringLongFunction = l -> parsedOp.getOptionalStaticValue(paramName, String.class) - .filter(Predicate.not(String::isEmpty)) - .map(value -> { - Set set = new HashSet<>(); - - if (StringUtils.contains(value,',')) { - set = Arrays.stream(value.split(",")) - .map(String::trim) - .filter(Predicate.not(String::isEmpty)) - .collect(Collectors.toCollection(LinkedHashSet::new)); - } - - return set; - }).orElse(Collections.emptySet()); - logger.info("{}: {}", paramName, setStringLongFunction.apply(0)); - return setStringLongFunction; - } - - // If the corresponding Op parameter is not provided, use the specified default value - protected LongFunction lookupStaticIntOpValueFunc(String paramName, int defaultValue) { - LongFunction integerLongFunction; - integerLongFunction = l -> parsedOp.getOptionalStaticValue(paramName, String.class) - .filter(Predicate.not(String::isEmpty)) - .map(value -> NumberUtils.toInt(value)) - .map(value -> { - if (0 > value) return 0; - return value; - }).orElse(defaultValue); - logger.info("{}: {}", paramName, integerLongFunction.apply(0)); - return integerLongFunction; - } - - // If the corresponding Op parameter is not provided, use the specified default value - protected LongFunction lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { - LongFunction stringLongFunction; - stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class) - .orElse(l -> defaultValue); - logger.info("{}: {}", paramName, stringLongFunction.apply(0)); - - return stringLongFunction; - } - protected LongFunction lookupOptionalStrOpValueFunc(String paramName) { - return lookupOptionalStrOpValueFunc(paramName, ""); - } - // Mandatory Op parameter. Throw an error if not specified or having empty value protected LongFunction lookupMandtoryStrOpValueFunc(String paramName) { LongFunction stringLongFunction;