From b1c06661ce514450c83f1c0d53c6fa9a911d8663 Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Wed, 31 May 2023 18:21:52 -0500 Subject: [PATCH] Initial draft of NB5 S4R --- .../nosqlbench/adapter/kafka/KafkaSpace.java | 7 +- .../main/resources/build-nb-kafka-driver.sh | 2 +- .../io/nosqlbench/adapter/s4j/S4JSpace.java | 8 +- .../s4j/dispensers/S4JBaseOpDispenser.java | 1 - adapter-s4r/pom.xml | 81 +++++ .../adapter/s4r/S4RDriverAdapter.java | 52 ++++ .../nosqlbench/adapter/s4r/S4ROpMapper.java | 71 +++++ .../io/nosqlbench/adapter/s4r/S4ROpType.java | 24 ++ .../io/nosqlbench/adapter/s4r/S4RSpace.java | 278 ++++++++++++++++++ .../s4r/dispensers/AmqpBaseOpDispenser.java | 133 +++++++++ .../dispensers/AmqpMsgRecvOpDispenser.java | 127 ++++++++ .../dispensers/AmqpMsgSendOpDispenser.java | 180 ++++++++++++ .../S4RAdapterInvalidParamException.java | 29 ++ .../S4RAdapterUnexpectedException.java | 30 ++ .../S4RAdapterUnsupportedOpException.java | 24 ++ .../s4r/ops/OpTimeTrackAmqpMsgRecvOp.java | 82 ++++++ .../s4r/ops/OpTimeTrackAmqpMsgSendOp.java | 120 ++++++++ .../adapter/s4r/ops/S4RTimeTrackOp.java | 70 +++++ .../adapter/s4r/util/S4RAdapterMetrics.java | 111 +++++++ .../adapter/s4r/util/S4RAdapterUtil.java | 114 +++++++ .../adapter/s4r/util/S4RClientConf.java | 89 ++++++ adapter-s4r/src/main/resources/README.md | 62 ++++ .../src/main/resources/build-nb-s4r-driver.sh | 24 ++ .../src/main/resources/csv/binding_keys.csv | 5 + .../src/main/resources/csv/exchange_names.csv | 4 + .../src/main/resources/csv/queue_names.csv | 3 + .../src/main/resources/csv/routing_keys.csv | 3 + .../src/main/resources/s4r_config.properties | 24 ++ .../src/main/resources/s4r_consumer.yaml | 18 ++ .../src/main/resources/s4r_producer.yaml | 39 +++ .../src/main/resources/start_s4r_consumer.sh | 36 +++ .../src/main/resources/start_s4r_producer.sh | 38 +++ nb5/pom.xml | 6 + pom.xml | 1 + 34 files changed, 1885 insertions(+), 11 deletions(-) create mode 100644 adapter-s4r/pom.xml create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgRecvOpDispenser.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java create mode 100644 adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java create mode 100644 adapter-s4r/src/main/resources/README.md create mode 100755 adapter-s4r/src/main/resources/build-nb-s4r-driver.sh create mode 100644 adapter-s4r/src/main/resources/csv/binding_keys.csv create mode 100644 adapter-s4r/src/main/resources/csv/exchange_names.csv create mode 100644 adapter-s4r/src/main/resources/csv/queue_names.csv create mode 100644 adapter-s4r/src/main/resources/csv/routing_keys.csv create mode 100644 adapter-s4r/src/main/resources/s4r_config.properties create mode 100644 adapter-s4r/src/main/resources/s4r_consumer.yaml create mode 100644 adapter-s4r/src/main/resources/s4r_producer.yaml create mode 100755 adapter-s4r/src/main/resources/start_s4r_consumer.sh create mode 100755 adapter-s4r/src/main/resources/start_s4r_producer.sh diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java index 83d7e28d5..096f92a4e 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java @@ -49,13 +49,14 @@ public class KafkaSpace implements AutoCloseable { private final KafkaClientConf kafkaClientConf; // Whether to do strict error handling while sending/receiving messages - // - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop + // - Yes: any error returned from the Kafka server (or Kafka compatible server like Pulsar) while doing message + // receiving/sending will trigger NB execution stop // - No: pause the current thread that received the error message for 1 second and then continue processing private final boolean strictMsgErrorHandling; - // Maximum time length to execute S4J operations (e.g. message send or consume) + // Maximum time length to execute Kafka operations (e.g. message send or consume) // - when NB execution passes this threshold, it is simply NoOp - // - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes + // - 0 means no maximum time constraint. KafkaOp is always executed until NB execution cycle finishes private final long maxOpTimeInSec; private final long activityStartTimeMills; diff --git a/adapter-kafka/src/main/resources/build-nb-kafka-driver.sh b/adapter-kafka/src/main/resources/build-nb-kafka-driver.sh index cb1a7b691..b31fb0aa0 100755 --- a/adapter-kafka/src/main/resources/build-nb-kafka-driver.sh +++ b/adapter-kafka/src/main/resources/build-nb-kafka-driver.sh @@ -20,5 +20,5 @@ cd "$(git rev-parse --show-toplevel)" && \ mvn clean install "-DskipTests" -pl adapters-api,adapter-kafka,nb5 && \ [[ ${SKIP_TESTS} -ne 1 ]] && \ - mvn test -pl adapters-api,adapter-pulsar + mvn test -pl adapters-api,adapter-kafka ) diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java index 4f84ec8c7..60fa8fa3d 100644 --- a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java @@ -67,7 +67,7 @@ public class S4JSpace implements AutoCloseable { private final int sessionMode; // Whether to do strict error handling while sending/receiving messages - // - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop + // - Yes: any error returned from the JMS/Pulsar server while doing message receiving/sending will trigger NB execution stop // - No: pause the current thread that received the error message for 1 second and then continue processing private boolean strictMsgErrorHandling; @@ -212,10 +212,8 @@ public class S4JSpace implements AutoCloseable { public long getTotalOpResponseCnt() { return totalOpResponseCnt.get();} public long incTotalOpResponseCnt() { return totalOpResponseCnt.incrementAndGet();} - public void resetTotalOpResponseCnt() { totalOpResponseCnt.set(0); } public long getTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.get();} - public void resetTotalNullMsgRecvdCnt() { nullMsgRecvCnt.set(0); } public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); } @@ -252,9 +250,7 @@ public class S4JSpace implements AutoCloseable { } } catch (JMSRuntimeException e) { - if (logger.isDebugEnabled()) { - logger.debug("[ERROR] Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString()); - } + logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString()); throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause()); } catch (Exception e) { diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java index ade86a998..45381a983 100644 --- a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java @@ -68,7 +68,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser + + + 4.0.0 + + adapter-s4r + jar + + + mvn-defaults + io.nosqlbench + ${revision} + ../mvn-defaults + + + ${project.artifactId} + + A AMQP 0.91 driver for nosqlbench. This provides the ability to inject synthetic data + into an AMQP-0.91 (e.g. RabbitMQ) or an AMQP-0.91-compatible (e.g. Pulsar with S4R) system. + + + + 5.17.0 + + + + + io.nosqlbench + engine-api + ${revision} + + + + io.nosqlbench + adapters-api + ${revision} + + + + com.rabbitmq + amqp-client + ${amqp.version} + + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + commons-beanutils + commons-beanutils + 1.9.4 + + + + + org.apache.commons + commons-configuration2 + 2.9.0 + + + + diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java new file mode 100644 index 000000000..4ee136eef --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r; + +import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.api.config.standard.NBConfigModel; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.nb.annotations.Service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Function; + +@Service(value = DriverAdapter.class, selector = "s4r") +public class S4RDriverAdapter extends BaseDriverAdapter { + private final static Logger logger = LogManager.getLogger(S4RDriverAdapter.class); + + @Override + public OpMapper getOpMapper() { + DriverSpaceCache spaceCache = getSpaceCache(); + NBConfiguration adapterConfig = getConfiguration(); + return new S4ROpMapper(this, adapterConfig, spaceCache); + } + + @Override + public Function getSpaceInitializer(NBConfiguration cfg) { + return (s) -> new S4RSpace(s, cfg); + } + + @Override + public NBConfigModel getConfigModel() { + return super.getConfigModel().add(S4RSpace.getConfigModel()); + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java new file mode 100644 index 000000000..5004ae9f2 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r; + +import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgRecvOpDispenser; +import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgSendOpDispenser; +import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.engine.api.activityimpl.OpDispenser; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.engine.api.templating.ParsedOp; +import io.nosqlbench.engine.api.templating.TypeAndTarget; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class S4ROpMapper implements OpMapper { + + private final static Logger logger = LogManager.getLogger(S4ROpMapper.class); + + private final NBConfiguration cfg; + private final DriverSpaceCache spaceCache; + private final DriverAdapter adapter; + + public S4ROpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache spaceCache) { + this.cfg = cfg; + this.spaceCache = spaceCache; + this.adapter = adapter; + } + + @Override + public OpDispenser apply(ParsedOp op) { + String spaceName = op.getStaticConfigOr("space", "default"); + S4RSpace s4RSpace = spaceCache.get(spaceName); + + /* + * If the user provides a body element, then they want to provide the JSON or + * a data structure that can be converted into JSON, bypassing any further + * specialized type-checking or op-type specific features + */ + if (op.isDefined("body")) { + throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field."); + } + else { + TypeAndTarget opType = op.getTypeAndTarget(S4ROpType.class, String.class); + + return switch (opType.enumId) { + case AmqpMsgSender -> + new AmqpMsgSendOpDispenser(adapter, op, s4RSpace); + case AmqpMsgReceiver -> + new AmqpMsgRecvOpDispenser(adapter, op, s4RSpace); + }; + } + } + +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java new file mode 100644 index 000000000..11baa7ac7 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r; + +public enum S4ROpType { + AmqpMsgSender, + AmqpMsgReceiver +} + + diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java new file mode 100644 index 000000000..645a94aeb --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; +import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil; +import io.nosqlbench.adapter.s4r.util.S4RClientConf; +import io.nosqlbench.api.config.standard.ConfigModel; +import io.nosqlbench.api.config.standard.NBConfigModel; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.api.config.standard.Param; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +public class S4RSpace implements AutoCloseable { + + private final static Logger logger = LogManager.getLogger(S4RSpace.class); + + private final String spaceName; + private final NBConfiguration cfg; + + private final S4RClientConf s4rClientConf; + + /////////////////////////////////////////////////////////////////// + // NOTE: in this driver, we assume: + // - possible multiple connections + // - possible multiple channels per connection + // - TBD: only one exchange per channel + // - for senders, possible multiple senders per exchange + // - for receivers, + // * possible multiple queues per exchange + // * possible multiple receivers per queue + // + // Each NB thread is a single sender or receiver + // + // All senders/receivers share the same set of connections/channels/exchanges/queues + /////////////////////////////////////////////////////////////////// + + + // Maximum number of AMQP connections + private final int amqpConnNum; + + // Maximum number of AMQP channels per connection + private final int amqpConnChannelNum; + + // Max number of queues (per exchange) + // - only relevant with message receivers + private final int amqpExchangeQueueNum; + + // Max number of message clients (senders or receivers) + // - for senders, this is the number of message senders per exchange + // - for recievers, this is the number of message receivers per queue + // (there could be multiple queues per exchange) + private final int amqpMsgClntNum; + + + private final AtomicBoolean beingShutdown = new AtomicBoolean(false); + + private ConnectionFactory s4rConnFactory; + + // Default to "direct" type + private String amqpExchangeType = S4RAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label; + + private final ConcurrentHashMap amqpConnections = new ConcurrentHashMap<>(); + + /////////////////////////////////// + // NOTE: Do NOT mix sender and receiver workload in one NB workload + /////////////////////////////////// + + // Amqp Channels for senders + public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { } + private final ConcurrentHashMap amqpSenderChannels = new ConcurrentHashMap<>(); + + // Amqp Channels for receivers + public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { } + private final ConcurrentHashMap amqpReceiverChannels = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>(); + + // Whether to do strict error handling while sending/receiving messages + // - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing + // message receiving/sending will trigger NB execution stop + // - No: pause the current thread that received the error message for 1 second and then continue processing + private final boolean strictMsgErrorHandling; + + // Maximum time length to execute S4R operations (e.g. message send or consume) + // - when NB execution passes this threshold, it is simply NoOp + // - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes + private final long maxOpTimeInSec; + private final long activityStartTimeMills; + + private long totalCycleNum; + private long totalThreadNum; + + public S4RSpace(String spaceName, NBConfiguration cfg) { + this.spaceName = spaceName; + this.cfg = cfg; + + String s4rClientConfFileName = cfg.get("config"); + this.s4rClientConf = new S4RClientConf(s4rClientConfFileName); + this.amqpConnNum = + NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); + this.amqpConnChannelNum = + NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1")); + this.amqpExchangeQueueNum = + NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1")); + this.amqpMsgClntNum = + NumberUtils.toInt(cfg.getOptional("num_msg_clnt").orElse("1")); + this.maxOpTimeInSec = + NumberUtils.toLong(cfg.getOptional("max_op_time").orElse("0L")); + this.strictMsgErrorHandling = + BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false")); + this.activityStartTimeMills = System.currentTimeMillis(); + + this.initializeSpace(s4rClientConf); + } + + @Override + public void close() { + shutdownSpace(); + } + + public static NBConfigModel getConfigModel() { + return ConfigModel.of(S4RSpace.class) + .add(Param.defaultTo("config", "config.properties") + .setDescription("S4R client connection configuration property file.")) + .add(Param.defaultTo("num_conn", 1) + .setDescription("Maximum number of AMQP connections.")) + .add(Param.defaultTo("num_channel", 1) + .setDescription("Maximum number of AMQP channels per connection")) + .add(Param.defaultTo("max_op_time", 0) + .setDescription("Maximum time (in seconds) to run NB Kafka testing scenario.")) + .add(Param.defaultTo("strict_msg_error_handling", false) + .setDescription("Whether to do strict error handling which is to stop NB Kafka execution.")) + .asReadOnly(); + } + + public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); } + + public Channel getAmqpSenderChannel( + AmqpSenderChannelKey key, + Supplier channelSupplier) { + return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get()); + } + + public Channel getAmqpReceiverChannel( + AmqpReceiverChannelKey key, + Supplier channelSupplier) { + return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get()); + } + + public long getActivityStartTimeMills() { return this.activityStartTimeMills; } + public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; } + public S4RClientConf getS4rClientConf() { return s4rClientConf; } + + public String getAmqpExchangeType() { return amqpExchangeType; } + public int getAmqpConnNum() { return this.amqpConnNum; } + public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; } + public int getAmqpExchangeQueueNum() { return this.amqpConnNum; } + public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; } + + public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } + + public long getTotalCycleNum() { return totalCycleNum; } + public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; } + + public long getTotalThreadNum() { return totalThreadNum; } + public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; } + + public void initializeSpace(S4RClientConf s4rClientConnInfo) { + Map cfgMap = s4rClientConnInfo.getS4rConfMap(); + + if (amqpConnNum < 1) { + String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!"; + throw new S4RAdapterInvalidParamException(errMsg); + } + + if (amqpConnChannelNum < 1) { + String errMsg = "AMQP channel number per connection (\"num_channel\") must be a positive number!"; + throw new S4RAdapterInvalidParamException(errMsg); + } + + amqpExchangeType = cfgMap.get("exchangeType"); + if (!S4RAdapterUtil.AMQP_EXCHANGE_TYPES.isValidLabel(amqpExchangeType)) { + String errMsg = "Invalid AMQP exchange type: \"" + amqpExchangeType + "\". " + + "Valid values are: \"" + S4RAdapterUtil.getValidAmqpExchangeTypeList() + "\""; + throw new S4RAdapterInvalidParamException(errMsg); + } + + if (s4rConnFactory == null) { + try { + s4rConnFactory = new ConnectionFactory(); + + String passWord = cfg.get("jwtToken"); + s4rConnFactory.setPassword(cfgMap.get("")); + s4rConnFactory.setPassword(passWord); + + String amqpServerHost = cfg.get("amqpSrvHost"); + s4rConnFactory.setHost(amqpServerHost); + + int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort")); + s4rConnFactory.setPort(amqpServerPort); + + String amqpVirtualHost = cfg.get("virtualHost"); + s4rConnFactory.setVirtualHost(amqpVirtualHost); + + + for (int i = 0; i < getAmqpConnNum(); i++) { + Connection connection = s4rConnFactory.newConnection(); + amqpConnections.put((long) i, connection); + + if (logger.isDebugEnabled()) { + logger.debug("[AMQP Connection created] {} -- [{}] {}", + Thread.currentThread().getName(), + i, + connection); + } + } + } catch (IOException|TimeoutException ex) { + logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", + s4rClientConnInfo.toString()); + throw new S4RAdapterUnexpectedException(ex); + } + } + } + + public void shutdownSpace() { + try { + beingShutdown.set(true); + + for (Channel channel : amqpSenderChannels.values()) { + channel.close(); + } + + for (Channel channel : amqpReceiverChannels.values()) { + channel.close(); + } + + for (Connection connection : amqpConnections.values()) { + connection.close(); + } + + // Pause 5 seconds before closing producers/consumers + S4RAdapterUtil.pauseCurThreadExec(5); + } + catch (Exception ex) { + String exp = "Unexpected error when shutting down the S4R adaptor space"; + logger.error(exp, ex); + } + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java new file mode 100644 index 000000000..109b37898 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.dispensers; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import io.nosqlbench.adapter.s4r.S4RSpace; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; +import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.LongFunction; + +public abstract class AmqpBaseOpDispenser extends BaseOpDispenser { + + private static final Logger logger = LogManager.getLogger("AmqpBaseOpDispenser"); + + protected final ParsedOp parsedOp; + protected final S4RAdapterMetrics s4rAdapterMetrics; + protected final S4RSpace s4rSpace; + + protected final Map s4rConfMap = new HashMap<>(); + protected final String exchangeType; + protected final LongFunction exchangeNameFunc; + + protected AmqpBaseOpDispenser(final DriverAdapter adapter, + final ParsedOp op, + final S4RSpace s4RSpace) { + + super(adapter, op); + + parsedOp = op; + this.s4rSpace = s4RSpace; + + s4rAdapterMetrics = new S4RAdapterMetrics(this, this); + s4rAdapterMetrics.initS4JAdapterInstrumentation(); + + s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap()); + + this.exchangeType = s4RSpace.getAmqpExchangeType(); + this.exchangeNameFunc = lookupMandtoryStrOpValueFunc("exchange_name"); + + s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class))); + s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class))); + } + + protected LongFunction lookupMandtoryStrOpValueFunc(String paramName) { + LongFunction stringLongFunction; + stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class); + logger.info("{}: {}", paramName, stringLongFunction.apply(0)); + + return stringLongFunction; + } + + protected LongFunction lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { + LongFunction stringLongFunction; + stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class) + .orElse(l -> defaultValue); + logger.info("{}: {}", paramName, stringLongFunction.apply(0)); + + return stringLongFunction; + } + + protected Channel getChannelWithExchange(Connection amqpConnection, + long connSeqNum, + long channelSeqNum, + String exchangeName) + throws IOException { + Channel channel = amqpConnection.createChannel(); + if (channel == null) { + throw new S4RAdapterUnexpectedException("No AMQP channel is available!"); + } + if (logger.isDebugEnabled()) { + logger.debug("AMQP channel created -- {} [{},{}] ", + channel, + connSeqNum, + channelSeqNum); + } + + AMQP.Exchange.DeclareOk declareOk = + channel.exchangeDeclare(exchangeName, s4rSpace.getAmqpExchangeType()); + if (logger.isDebugEnabled()) { + logger.debug("AMQP exchange declared -- [name: {}, type: {}] {}", + exchangeName, + exchangeType, + declareOk); + } + + return channel; + } + + protected long getConnSeqNum(long cycle) { + return cycle % s4rSpace.getAmqpConnNum(); + } + + protected long getConnChannelSeqNum(long cycle) { + return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum(); + } + + protected String getEffectiveExchangeName(long cycle) { + String exchangeNameInput = exchangeNameFunc.apply(cycle); + return (StringUtils.isBlank(exchangeNameInput) ? "exchange-" + getConnChannelSeqNum(cycle) : exchangeNameInput); + } + + public String getName() { + return "AmqpBaseOpDispenser"; + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgRecvOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgRecvOpDispenser.java new file mode 100644 index 000000000..4a9520a99 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgRecvOpDispenser.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.dispensers; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import io.nosqlbench.adapter.s4r.S4RSpace; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; +import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgRecvOp; +import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Set; +import java.util.function.LongFunction; + +public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { + + private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser"); + + private final LongFunction bindingKeyFunc; + private final LongFunction queueNameFunc; + public AmqpMsgRecvOpDispenser(DriverAdapter adapter, + ParsedOp op, + S4RSpace s4rSpace) { + super(adapter, op, s4rSpace); + + queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null); + bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); + } + + private long getExchangeQueueSeqNum(long cycle) { + return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) + % s4rSpace.getAmqpExchangeQueueNum(); + } + + private long getQueueReceiverSeqNum(long cycle) { + return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum())) + % s4rSpace.getAmqpMsgClntNum(); + } + + private String getEffectiveQueueName(long cycle) { + String queueNameInput = queueNameFunc.apply(cycle); + return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput); + } + + private Channel getAmqpChannelQueueForReceiver(long cycle, + String exchangeName, + String queueName) { + long connSeqNum = getConnSeqNum(cycle); + long channelSeqNum = getConnChannelSeqNum(cycle); + long queueSeqNum = getExchangeQueueSeqNum(cycle); + long receiverSeqNum = getQueueReceiverSeqNum(cycle); + + Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); + + S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey = + new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum); + + return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> { + Channel channel = null; + + try { + channel = getChannelWithExchange( + amqpConnection, + connSeqNum, + channelSeqNum, + exchangeName); + + AMQP.Queue.DeclareOk declareOk = + channel.queueDeclare(queueName, true, true, true, null); + if (logger.isDebugEnabled()) { + logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}", + exchangeName, + queueName, + declareOk); + } + } catch (IOException ex) { + throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); + } + + return channel; + }); + } + + @Override + public S4RTimeTrackOp apply(long cycle) { + Channel channel = null; + + String exchangeName = getEffectiveExchangeName(cycle); + String queueName = getEffectiveQueueName(cycle); + + try { + channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName); + } + catch (Exception ex) { + throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!"); + } + + return new OpTimeTrackAmqpMsgRecvOp( + s4rAdapterMetrics, + s4rSpace, + channel, + exchangeName, + queueName, + bindingKeyFunc.apply(cycle)); + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java new file mode 100644 index 000000000..7f7580ffd --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.dispensers; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import io.nosqlbench.adapter.s4r.S4RSpace; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; +import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgSendOp; +import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.function.LongFunction; +import java.util.function.Predicate; + +public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { + + private final static Logger logger = LogManager.getLogger("AmqpMsgSendOpDispenser"); + + private boolean publisherConfirm ; + // Only relevant when 'publisherConfirm' is true + // - default to "individual" confirm + private String confirmMode; + + // Only relevant when 'publisherConfirm' is true and 'confirmMode' is 'batch' + // - default to 100 + private int confirmBatchNum; + + private final LongFunction routingKeyFunc; + private final LongFunction msgPayloadFunc; + + public AmqpMsgSendOpDispenser(DriverAdapter adapter, + ParsedOp op, + S4RSpace s4rSpace) { + super(adapter, op, s4rSpace); + + publisherConfirm = parsedOp + .getOptionalStaticConfig("publisher_confirm", String.class) + .filter(Predicate.not(String::isEmpty)) + .map(BooleanUtils::toBoolean) + .orElse(false); + + confirmMode = parsedOp + .getOptionalStaticValue("confirm_mode", String.class) + .orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label); + if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) { + throw new S4RAdapterInvalidParamException("confirm_mode", + "Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); + } + + confirmBatchNum = parsedOp + .getOptionalStaticConfig("confirm_batch_num", String.class) + .filter(Predicate.not(String::isEmpty)) + .map(NumberUtils::toInt) + .orElse(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM); + if (confirmBatchNum < S4RAdapterUtil.AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN) { + confirmBatchNum = S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM; + } + + routingKeyFunc = lookupOptionalStrOpValueFunc("routing_key", null); + + msgPayloadFunc = lookupMandtoryStrOpValueFunc("message"); + } + + private long getExchangeSenderSeqNum(long cycle) { + return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) + % s4rSpace.getAmqpMsgClntNum(); + } + + private Channel getAmqpChannelForSender(long cycle, + String exchangeName) { + long connSeqNum = getConnSeqNum(cycle); + long channelSeqNum = getConnChannelSeqNum(cycle); + long senderSeqNum = getExchangeSenderSeqNum(cycle); + + Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); + + S4RSpace.AmqpSenderChannelKey amqpConnChannelKey = + new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum); + + return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> { + Channel channel = null; + + try { + channel = getChannelWithExchange( + amqpConnection, + connSeqNum, + channelSeqNum, + exchangeName); + + if (publisherConfirm) { + channel.confirmSelect(); + + boolean asyncConfirm = false; + if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) { + asyncConfirm = true; + + channel.addConfirmListener((sequenceNumber, multiple) -> { + // code when message is confirmed + if (logger.isTraceEnabled()) { + logger.debug("Async ack of message publish received: {}, {}", + sequenceNumber, multiple); + } + }, (sequenceNumber, multiple) -> { + // code when message is nack-ed + if (logger.isTraceEnabled()) { + logger.debug("Async n-ack of message publish received: {}, {}", + sequenceNumber, multiple); + } + }); + } + + if (logger.isDebugEnabled()) { + logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}", + !asyncConfirm, + channel); + } + } + + } catch (IOException ex) { + throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); + } + + return channel; + }); + } + + @Override + public S4RTimeTrackOp apply(long cycle) { + String msgPayload = msgPayloadFunc.apply(cycle); + if (StringUtils.isBlank(msgPayload)) { + throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!"); + } + + Channel channel = null; + String exchangeName = getEffectiveExchangeName(cycle); + + try { + channel = getAmqpChannelForSender(cycle, exchangeName); + } + catch (Exception ex) { + throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!"); + } + + return new OpTimeTrackAmqpMsgSendOp( + s4rAdapterMetrics, + s4rSpace, + channel, + exchangeName, + msgPayload, + routingKeyFunc.apply(cycle), + publisherConfirm, + confirmMode, + confirmBatchNum); + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java new file mode 100644 index 000000000..c5b56fa04 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4r.exception; + +public class S4RAdapterInvalidParamException extends RuntimeException { + + public S4RAdapterInvalidParamException(String paramName, String errDesc) { + super("Invalid setting for parameter (" + paramName + "): " + errDesc); + } + + public S4RAdapterInvalidParamException(String fullErrDesc) { + super(fullErrDesc); + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java new file mode 100644 index 000000000..a0597fb17 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4r.exception; + +public class S4RAdapterUnexpectedException extends RuntimeException { + + public S4RAdapterUnexpectedException(String message) { + super(message); + printStackTrace(); + } + public S4RAdapterUnexpectedException(Exception e) { + super(e); + printStackTrace(); + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java new file mode 100644 index 000000000..3e48af652 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.exception; + +public class S4RAdapterUnsupportedOpException extends RuntimeException { + + public S4RAdapterUnsupportedOpException(final String kafkaOpType) { + super("Unsupported Kafka adapter operation type: \"" + kafkaOpType + '"'); + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java new file mode 100644 index 000000000..2ad551ead --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.ops; + + +import com.rabbitmq.client.*; +import io.nosqlbench.adapter.s4r.S4RSpace; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; +import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + + +public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { + + private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp"); + + private final String queueName; + private final String bindingKey; + + public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics, + S4RSpace s4rSpace, + Channel channel, + String exchangeName, + String queueName, + String bindingKey) { + super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); + this.queueName = queueName; + this.bindingKey = bindingKey; + + try { + channel.queueBind(queueName, exchangeName, bindingKey); + } + catch (IOException ex) { + throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " + + "exchange (\"" + exchangeName + "\")!"); + } + } + + @Override + void cycleMsgProcess(long cycle, Object cycleObj) { + try { + Consumer receiver = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + if (logger.isTraceEnabled()) { + String msgPayload = new String(body, StandardCharsets.UTF_8); + logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}", + msgPayload, + consumerTag, + channel); + } + } + }; + + channel.basicConsume(queueName, receiver); + + } + catch (IOException e) { + throw new S4RAdapterUnexpectedException( + "Failed to receive message via the current channel: " + channel); + } + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java new file mode 100644 index 000000000..9c9795fcc --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.ops; + + +import com.rabbitmq.client.Channel; +import io.nosqlbench.adapter.s4r.S4RSpace; +import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; +import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeoutException; + + +public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { + + private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgSendOp"); + + private final String routingKey; + private final boolean publishConfirm; + private final String confirmMode; + private final int confirmBatchNum; + + private static final ThreadLocal + publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); + + public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics, + S4RSpace s4rSpace, + Channel channel, + String exchangeName, + String message, + String routingKey, + boolean publishConfirm, + String confirmMode, + int confirmBatchNum) { + super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); + this.cycleObj = message; + this.routingKey = routingKey; + this.publishConfirm = publishConfirm; + this.confirmMode = confirmMode; + this.confirmBatchNum = confirmBatchNum; + } + + @Override + void cycleMsgProcess(long cycle, Object cycleObj) { + assert (cycleObj != null); + assert (cycleObj.getClass().equals(String.class)); + + String msgPayload = (String) cycleObj; + + try { + channel.basicPublish( + exchangeName, + routingKey, + null, + msgPayload.getBytes(StandardCharsets.UTF_8)); + + if (publishConfirm) { + // Individual publish confirm + if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) { + channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + } + // Batch publish confirm + else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) { + int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get(); + if ( (publishConfirmTrackingCnt > 0) && + ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || + (publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { + synchronized (this) { + channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + } + } + else { + publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1); + } + } + // Async publish confirm + // - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser' + } + + if (logger.isTraceEnabled()) { + logger.trace("Successfully published message ({}) via the current channel: {}", + msgPayload, channel); + } + } + catch (IllegalStateException ex) { + throw new S4RAdapterUnexpectedException( + "Wait for confirm on a wrong non-confirm channel: " + channel); + } + catch (InterruptedException | TimeoutException ex) { + throw new S4RAdapterUnexpectedException( + "Failed to wait for the ack of the published message (" + msgPayload + + ") via the current channel: " + channel); + } + catch (IOException ex) { + throw new S4RAdapterUnexpectedException( + "Failed to publish message (" + msgPayload + + ") via the current channel: " + channel); + } + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java new file mode 100644 index 000000000..a6726627f --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4r.ops; + +import com.rabbitmq.client.Channel; +import io.nosqlbench.adapter.s4r.S4RSpace; +import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp; + +import java.io.IOException; + +public abstract class S4RTimeTrackOp implements CycleOp { + private final S4RAdapterMetrics s4rAdapterMetrics; + protected final S4RSpace s4RSpace; + protected final Channel channel; + protected final String exchangeName; + + // Maximum time length to execute Kafka operations (e.g. message send or consume) + // - when NB execution passes this threshold, it is simply NoOp + // - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes + protected final long maxOpTimeInSec; + + protected final long activityStartTime; + + protected Object cycleObj; + + public S4RTimeTrackOp(S4RAdapterMetrics s4rAdapterMetrics, + S4RSpace s4RSpace, + Channel channel, + String exchangeName) + { + this.s4rAdapterMetrics = s4rAdapterMetrics; + this.s4RSpace = s4RSpace; + this.channel = channel; + this.exchangeName = exchangeName; + this.activityStartTime = s4RSpace.getActivityStartTimeMills(); + this.maxOpTimeInSec = s4RSpace.getMaxOpTimeInSec(); + } + + @Override + public Object apply(long cycle) { + long timeElapsedMills = System.currentTimeMillis() - activityStartTime; + + // If maximum operation duration is specified, only process messages + // before the maximum duration threshold is reached. Otherwise, this is + // just no-op. + if ( (maxOpTimeInSec == 0) || (timeElapsedMills <= (maxOpTimeInSec*1000)) ) { + cycleMsgProcess(cycle, cycleObj); + } + + return null; + } + + abstract void cycleMsgProcess(long cycle, Object cycleObj); +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java new file mode 100644 index 000000000..09eef0159 --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.util; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import io.nosqlbench.adapter.s4r.dispensers.AmqpBaseOpDispenser; +import io.nosqlbench.api.config.NBLabeledElement; +import io.nosqlbench.api.config.NBLabels; +import io.nosqlbench.api.engine.metrics.ActivityMetrics; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class S4RAdapterMetrics { + + private static final Logger logger = LogManager.getLogger("S4JAdapterMetrics"); + private final NBLabels labels; + + private Histogram messageSizeHistogram; + private Timer bindTimer; + private Timer executeTimer; + // - message out of sequence error counter + private Counter msgErrOutOfSeqCounter; + // - message loss counter + private Counter msgErrLossCounter; + // - message duplicate error counter + private Counter msgErrDuplicateCounter; + + public Histogram getE2eMsgProcLatencyHistogram() { + return this.e2eMsgProcLatencyHistogram; + } + + // end-to-end latency + private Histogram e2eMsgProcLatencyHistogram; + private final AmqpBaseOpDispenser s4rBaseOpDispenser; + + public S4RAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) { + this.s4rBaseOpDispenser = s4rBaseOpDispenser; + labels=labeledParent.getLabels().and("name", S4RAdapterMetrics.class.getSimpleName()); + } + + public void initS4JAdapterInstrumentation() { + // Histogram metrics + messageSizeHistogram = + ActivityMetrics.histogram(this.s4rBaseOpDispenser, + "message_size", ActivityMetrics.DEFAULT_HDRDIGITS); + + // Timer metrics + bindTimer = + ActivityMetrics.timer(this.s4rBaseOpDispenser, + "bind", ActivityMetrics.DEFAULT_HDRDIGITS); + executeTimer = + ActivityMetrics.timer(this.s4rBaseOpDispenser, + "execute", ActivityMetrics.DEFAULT_HDRDIGITS); + + // End-to-end metrics + // Latency + e2eMsgProcLatencyHistogram = + ActivityMetrics.histogram(this.s4rBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS); + // Error metrics + msgErrOutOfSeqCounter = + ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_oos"); + msgErrLossCounter = + ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_loss"); + msgErrDuplicateCounter = + ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_dup"); + } + + public Timer getBindTimer() { return bindTimer; } + public Timer getExecuteTimer() { return executeTimer; } + public Histogram getMessagesizeHistogram() { return messageSizeHistogram; } + + public Counter getMsgErrOutOfSeqCounter() { + return msgErrOutOfSeqCounter; + } + + public void setMsgErrOutOfSeqCounter(Counter msgErrOutOfSeqCounter) { + this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter; + } + + public Counter getMsgErrLossCounter() { + return msgErrLossCounter; + } + + public void setMsgErrLossCounter(Counter msgErrLossCounter) { + this.msgErrLossCounter = msgErrLossCounter; + } + + public Counter getMsgErrDuplicateCounter() { + return msgErrDuplicateCounter; + } + + public void setMsgErrDuplicateCounter(Counter msgErrDuplicateCounter) { + this.msgErrDuplicateCounter = msgErrDuplicateCounter; + } +} diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java new file mode 100644 index 000000000..9009d26cc --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.util; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class S4RAdapterUtil { + private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class); + + /////// + // Valid document level parameters for JMS NB yaml file + public enum DOC_LEVEL_PARAMS { + // Blocking message producing or consuming + ASYNC_API("async_api"); + public final String label; + + DOC_LEVEL_PARAMS(final String label) { + this.label = label; + } + } + + public enum AMQP_EXCHANGE_TYPES { + DIRECT("direct"), + FANOUT("fanout"), + TOPIC("topic"), + HEADERS("headers"); + + public final String label; + AMQP_EXCHANGE_TYPES(String label) { + this.label = label; + } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } + } + public static String getValidAmqpExchangeTypeList() { + return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", "); + } + + + public enum AMQP_PUB_CONFIRM_MODE { + INDIVIDUAL("individual"), + BATCH("batch"), + ASYNC("async"); + + public final String label; + AMQP_PUB_CONFIRM_MODE(String label) { + this.label = label; + } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } + } + public static String getValidAmqpPublisherConfirmModeList() { + return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", "); + } + + // At least 20 messages in a publishing batch + public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20; + public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100; + public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000; + + public static Map convertJsonToMap(final String jsonStr) throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonStr, new TypeReference>(){}); + } + + public static void pauseCurThreadExec(final int pauseInSec) { + if (0 < pauseInSec) try { + Thread.sleep(pauseInSec * 1000L); + } catch (final InterruptedException ie) { + ie.printStackTrace(); + } + } + + public static void messageErrorHandling(final Exception exception, final boolean strictErrorHandling, final String errorMsg) { + exception.printStackTrace(); + + if (strictErrorHandling) throw new RuntimeException(errorMsg + " [ " + exception.getMessage() + " ]"); + pauseCurThreadExec(1); + } +} + diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java new file mode 100644 index 000000000..dfc9b0bac --- /dev/null +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4r.util; + +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.FileBasedConfiguration; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; +import org.apache.commons.configuration2.builder.fluent.Parameters; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class S4RClientConf { + private static final Logger logger = LogManager.getLogger(S4RClientConf.class); + + // https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html + private final Map s4rConfMap = new HashMap<>(); + + + public S4RClientConf(final String clientConfFileName) { + + ////////////////// + // Read related S4R client configuration settings from a file + this.readRawConfFromFile(clientConfFileName); + } + + public void readRawConfFromFile(final String fileName) { + final File file = new File(fileName); + + try { + final String canonicalFilePath = file.getCanonicalPath(); + + final Parameters params = new Parameters(); + + final FileBasedConfigurationBuilder builder = + new FileBasedConfigurationBuilder(PropertiesConfiguration.class) + .configure(params.properties() + .setFileName(fileName)); + + final Configuration config = builder.getConfiguration(); + + for (final Iterator it = config.getKeys(); it.hasNext(); ) { + final String confKey = it.next(); + final String confVal = config.getProperty(confKey).toString(); + + // Get client connection specific configuration settings, removing "topic." prefix + if (!StringUtils.isBlank(confVal)) + this.s4rConfMap.put(confKey, confVal); + } + } catch (final IOException ioe) { + S4RClientConf.logger.error("Can't read the specified config properties file: {}", fileName); + ioe.printStackTrace(); + } catch (final ConfigurationException cex) { + S4RClientConf.logger.error("Error loading configuration items from the specified config properties file: {}:{}", fileName, cex.getMessage()); + cex.printStackTrace(); + } + } + + public Map getS4rConfMap() { return this.s4rConfMap; } + + public String toString() { + return new ToStringBuilder(this). + append("s4rConfMap", this.s4rConfMap). + toString(); + } +} diff --git a/adapter-s4r/src/main/resources/README.md b/adapter-s4r/src/main/resources/README.md new file mode 100644 index 000000000..3e66eb7cc --- /dev/null +++ b/adapter-s4r/src/main/resources/README.md @@ -0,0 +1,62 @@ +# Overview + +This NB Kafka adapter allows publishing messages to or consuming messages from +* a Kafka cluster, or +* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar. + +At high level, this adapter supports the following Kafka functionalities +* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers) +* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits) + * auto message commit + * manual message commit with a configurable number of message commits in one batch +* Kafka Transaction support + +## Example NB Yaml +* [kafka_producer.yaml](./s4r_producer.yaml) +* +* [kafka_consumer.yaml](./s4r_consumer.yaml) + +# Usage + +```bash +## Kafka Producer +$ run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 + +## Kafka Consumer +$ run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 +``` + +## NB Kafka adapter specific CLI parameters + +* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from + * For producer workload, this is the number of the producer threads to publish messages to the same topic + * Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe) + * `threads` and `num_clnt` values MUST be the same. + * For consumer workload, this is the partition number of a topic + * Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number. + * Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe) + * `threads` MUST be equal to `num_clnt`*`num_cons_grp` + +* `num_cons_grp`: the number of consumer groups + * Only relevant for consumer workload + + + +For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported: + +* `async_api` (boolean): + * When true, use async Kafka client API. +* `seq_tracking` (boolean): + * When true, a sequence number is created as part of each message's properties + * This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully. +* `seqerr_simu`: + * A list of error simulation types separated by comma (,) + * Valid error simulation types + * `out_of_order`: simulate message out of sequence + * `msg_loss`: simulate message loss + * `msg_dup`: simulate message duplication + * This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments. +* `e2e_starting_time_source`: + * Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. + * The possible values for `e2e_starting_time_source`: + * `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type). diff --git a/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh b/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh new file mode 100755 index 000000000..bdb55507f --- /dev/null +++ b/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh @@ -0,0 +1,24 @@ +#!/usr/local/bin/bash +# +# Copyright (c) 2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +: "${SKIP_TESTS:=1}" +( + cd "$(git rev-parse --show-toplevel)" && \ + mvn clean install "-DskipTests" -pl adapters-api,adapter-s4r,nb5 && \ + [[ ${SKIP_TESTS} -ne 1 ]] && \ + mvn test -pl adapters-api,adapter-s4r +) diff --git a/adapter-s4r/src/main/resources/csv/binding_keys.csv b/adapter-s4r/src/main/resources/csv/binding_keys.csv new file mode 100644 index 000000000..cd95ef23c --- /dev/null +++ b/adapter-s4r/src/main/resources/csv/binding_keys.csv @@ -0,0 +1,5 @@ +name +alpha +beta +gamma +delta diff --git a/adapter-s4r/src/main/resources/csv/exchange_names.csv b/adapter-s4r/src/main/resources/csv/exchange_names.csv new file mode 100644 index 000000000..3feca9f3f --- /dev/null +++ b/adapter-s4r/src/main/resources/csv/exchange_names.csv @@ -0,0 +1,4 @@ +name +usa +canada +german diff --git a/adapter-s4r/src/main/resources/csv/queue_names.csv b/adapter-s4r/src/main/resources/csv/queue_names.csv new file mode 100644 index 000000000..7c75221b9 --- /dev/null +++ b/adapter-s4r/src/main/resources/csv/queue_names.csv @@ -0,0 +1,3 @@ +queue1 +queue2 +queue3 diff --git a/adapter-s4r/src/main/resources/csv/routing_keys.csv b/adapter-s4r/src/main/resources/csv/routing_keys.csv new file mode 100644 index 000000000..217ebd523 --- /dev/null +++ b/adapter-s4r/src/main/resources/csv/routing_keys.csv @@ -0,0 +1,3 @@ +name +alpha +delta diff --git a/adapter-s4r/src/main/resources/s4r_config.properties b/adapter-s4r/src/main/resources/s4r_config.properties new file mode 100644 index 000000000..6c75f2ac8 --- /dev/null +++ b/adapter-s4r/src/main/resources/s4r_config.properties @@ -0,0 +1,24 @@ +# +# Copyright (c) 2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +## +# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled +amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com +amqpSrvPort=5671 +virtualHost=/rabbitmq +jwtToken= +# valid values: direct, fanout, topic, headers +exchangeType=direct diff --git a/adapter-s4r/src/main/resources/s4r_consumer.yaml b/adapter-s4r/src/main/resources/s4r_consumer.yaml new file mode 100644 index 000000000..d6b96d129 --- /dev/null +++ b/adapter-s4r/src/main/resources/s4r_consumer.yaml @@ -0,0 +1,18 @@ +bindings: + myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name') + myqueue: CSVFrequencySampler('csv/queue_names.csv', 'name') + myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') + +# Doc-level parameters (must be static) +params: + +blocks: + msg-recv-block: + ops: + AmqpMsgReceiver: + #exchange_names: "{myexname}" + exchange_name: "alpha" + + queue_name: "{myqueue}" + + binding_key: "{myroutingkey}" diff --git a/adapter-s4r/src/main/resources/s4r_producer.yaml b/adapter-s4r/src/main/resources/s4r_producer.yaml new file mode 100644 index 000000000..2e95eab69 --- /dev/null +++ b/adapter-s4r/src/main/resources/s4r_producer.yaml @@ -0,0 +1,39 @@ +bindings: + mytext_val: AlphaNumericString(100) + myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name') + myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') + + +# Doc-level parameters (must be static) +params: + # whether to do publisher confirm (for reliable publishing) + # - default: false + publisher_confirm: "false" + #publisher_confirm: "true" + # If 'publisher_confirm' is true, use one of the following 3 confirm modes: + # - individual (wait_for_confirm individually) + # - batch (wait_for_confirm in batch) + # - async [default] + confirm_mode: "aysnc" + #confirm_mode: "individual" + #confirm_mode: "batch" + + # Only relevant when 'publisher_confirm' is true and 'confirm_mode' is "batch" + confirm_batch_num: 100 + # default timeout value (in milliseconds) + # - only relevant when publisher_confirm' is true and 'confirm_mode' is NOT "async" + dft_confirm_timeout_ms: 1000 + + +blocks: + msg-send-block: + ops: + AmqpMsgSender: + #exchange_names: "{myexname}" + exchange_names: "alpha" + + routing_key: "{myroutingkey}" + + ## (Optional) Kafka message value. + # - message key and value can't be both empty at the same time + message: "{mytext_val}" diff --git a/adapter-s4r/src/main/resources/start_s4r_consumer.sh b/adapter-s4r/src/main/resources/start_s4r_consumer.sh new file mode 100755 index 000000000..1be463e4c --- /dev/null +++ b/adapter-s4r/src/main/resources/start_s4r_consumer.sh @@ -0,0 +1,36 @@ +#!/usr/local/bin/bash +# +# Copyright (c) 2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +: "${REBUILD:=1}" +: "${CYCLES:=1000000000}" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +if [[ ${REBUILD} -eq 1 ]]; then + "${SCRIPT_DIR}/build-nb-kafka-driver.sh" +fi +java -jar nb5/target/nb5.jar \ + run \ + driver=s4r \ + -vv \ + --report-interval 5 \ + --docker-metrics \ + cycles=${CYCLES} \ + threads=1 \ + num_clnt=1 \ + num_cons_grp=1 \ + yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \ + config="${SCRIPT_DIR}/kafka_config.properties" \ + bootstrap_server=PLAINTEXT://localhost:9092 diff --git a/adapter-s4r/src/main/resources/start_s4r_producer.sh b/adapter-s4r/src/main/resources/start_s4r_producer.sh new file mode 100755 index 000000000..0999c5fc0 --- /dev/null +++ b/adapter-s4r/src/main/resources/start_s4r_producer.sh @@ -0,0 +1,38 @@ +#!/usr/local/bin/bash +# +# Copyright (c) 2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +: "${REBUILD:=1}" +: "${CYCLES:=1000000000}" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +if [[ ${REBUILD} -eq 1 ]]; then + "${SCRIPT_DIR}/build-nb-kafka-driver.sh" +fi +while [[ 1 -eq 1 ]]; do + java -jar nb5/target/nb5.jar \ + run \ + driver=s4r \ + -vv \ + --report-interval 5 \ + --docker-metrics \ + cycles="${CYCLES}" \ + threads=1 \ + num_clnt=1 \ + yaml="${SCRIPT_DIR}/kafka_producer.yaml" \ + config="${SCRIPT_DIR}/kafka_config.properties" \ + bootstrap_server=PLAINTEXT://localhost:9092 + sleep 10 +done diff --git a/nb5/pom.xml b/nb5/pom.xml index 15984ee7b..558f7acc9 100644 --- a/nb5/pom.xml +++ b/nb5/pom.xml @@ -113,6 +113,12 @@ ${revision} + + io.nosqlbench + adapter-s4r + ${revision} + + io.nosqlbench adapter-jdbc diff --git a/pom.xml b/pom.xml index 6739919a7..d019aacb5 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ adapter-pulsar adapter-s4j adapter-kafka + adapter-s4r adapter-jdbc