mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Initial draft of NB5 S4R
This commit is contained in:
@@ -49,13 +49,14 @@ public class KafkaSpace implements AutoCloseable {
|
|||||||
private final KafkaClientConf kafkaClientConf;
|
private final KafkaClientConf kafkaClientConf;
|
||||||
|
|
||||||
// Whether to do strict error handling while sending/receiving messages
|
// Whether to do strict error handling while sending/receiving messages
|
||||||
// - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop
|
// - Yes: any error returned from the Kafka server (or Kafka compatible server like Pulsar) while doing message
|
||||||
|
// receiving/sending will trigger NB execution stop
|
||||||
// - No: pause the current thread that received the error message for 1 second and then continue processing
|
// - No: pause the current thread that received the error message for 1 second and then continue processing
|
||||||
private final boolean strictMsgErrorHandling;
|
private final boolean strictMsgErrorHandling;
|
||||||
|
|
||||||
// Maximum time length to execute S4J operations (e.g. message send or consume)
|
// Maximum time length to execute Kafka operations (e.g. message send or consume)
|
||||||
// - when NB execution passes this threshold, it is simply NoOp
|
// - when NB execution passes this threshold, it is simply NoOp
|
||||||
// - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
|
// - 0 means no maximum time constraint. KafkaOp is always executed until NB execution cycle finishes
|
||||||
private final long maxOpTimeInSec;
|
private final long maxOpTimeInSec;
|
||||||
private final long activityStartTimeMills;
|
private final long activityStartTimeMills;
|
||||||
|
|
||||||
|
|||||||
@@ -20,5 +20,5 @@
|
|||||||
cd "$(git rev-parse --show-toplevel)" && \
|
cd "$(git rev-parse --show-toplevel)" && \
|
||||||
mvn clean install "-DskipTests" -pl adapters-api,adapter-kafka,nb5 && \
|
mvn clean install "-DskipTests" -pl adapters-api,adapter-kafka,nb5 && \
|
||||||
[[ ${SKIP_TESTS} -ne 1 ]] && \
|
[[ ${SKIP_TESTS} -ne 1 ]] && \
|
||||||
mvn test -pl adapters-api,adapter-pulsar
|
mvn test -pl adapters-api,adapter-kafka
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
private final int sessionMode;
|
private final int sessionMode;
|
||||||
|
|
||||||
// Whether to do strict error handling while sending/receiving messages
|
// Whether to do strict error handling while sending/receiving messages
|
||||||
// - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop
|
// - Yes: any error returned from the JMS/Pulsar server while doing message receiving/sending will trigger NB execution stop
|
||||||
// - No: pause the current thread that received the error message for 1 second and then continue processing
|
// - No: pause the current thread that received the error message for 1 second and then continue processing
|
||||||
private boolean strictMsgErrorHandling;
|
private boolean strictMsgErrorHandling;
|
||||||
|
|
||||||
@@ -212,10 +212,8 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
|
|
||||||
public long getTotalOpResponseCnt() { return totalOpResponseCnt.get();}
|
public long getTotalOpResponseCnt() { return totalOpResponseCnt.get();}
|
||||||
public long incTotalOpResponseCnt() { return totalOpResponseCnt.incrementAndGet();}
|
public long incTotalOpResponseCnt() { return totalOpResponseCnt.incrementAndGet();}
|
||||||
public void resetTotalOpResponseCnt() { totalOpResponseCnt.set(0); }
|
|
||||||
|
|
||||||
public long getTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.get();}
|
public long getTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.get();}
|
||||||
public void resetTotalNullMsgRecvdCnt() { nullMsgRecvCnt.set(0); }
|
|
||||||
|
|
||||||
public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }
|
public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }
|
||||||
|
|
||||||
@@ -252,9 +250,7 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (JMSRuntimeException e) {
|
catch (JMSRuntimeException e) {
|
||||||
if (logger.isDebugEnabled()) {
|
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
|
||||||
logger.debug("[ERROR] Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
|
|
||||||
}
|
|
||||||
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
|
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
|||||||
@@ -68,7 +68,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
|
|||||||
this.parsedOp = op;
|
this.parsedOp = op;
|
||||||
this.s4jSpace = s4jSpace;
|
this.s4jSpace = s4jSpace;
|
||||||
|
|
||||||
String defaultMetricsPrefix = parsedOp.getLabels().linearize("activity");
|
|
||||||
this.s4jAdapterMetrics = new S4JAdapterMetrics(this);
|
this.s4jAdapterMetrics = new S4JAdapterMetrics(this);
|
||||||
s4jAdapterMetrics.initS4JAdapterInstrumentation();
|
s4jAdapterMetrics.initS4JAdapterInstrumentation();
|
||||||
|
|
||||||
|
|||||||
81
adapter-s4r/pom.xml
Normal file
81
adapter-s4r/pom.xml
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
<!--
|
||||||
|
~ Copyright (c) 2022-2023 nosqlbench
|
||||||
|
~
|
||||||
|
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
~ you may not use this file except in compliance with the License.
|
||||||
|
~ You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing, software
|
||||||
|
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
~ See the License for the specific language governing permissions and
|
||||||
|
~ limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>adapter-s4r</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<artifactId>mvn-defaults</artifactId>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
<relativePath>../mvn-defaults</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<name>${project.artifactId}</name>
|
||||||
|
<description>
|
||||||
|
A AMQP 0.91 driver for nosqlbench. This provides the ability to inject synthetic data
|
||||||
|
into an AMQP-0.91 (e.g. RabbitMQ) or an AMQP-0.91-compatible (e.g. Pulsar with S4R) system.
|
||||||
|
</description>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<amqp.version>5.17.0</amqp.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<artifactId>engine-api</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<artifactId>adapters-api</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.rabbitmq</groupId>
|
||||||
|
<artifactId>amqp-client</artifactId>
|
||||||
|
<version>${amqp.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-lang3</artifactId>
|
||||||
|
<version>3.12.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-beanutils</groupId>
|
||||||
|
<artifactId>commons-beanutils</artifactId>
|
||||||
|
<version>1.9.4</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-configuration2</artifactId>
|
||||||
|
<version>2.9.0</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
||||||
@@ -0,0 +1,52 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r;
|
||||||
|
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
|
import io.nosqlbench.api.config.standard.NBConfigModel;
|
||||||
|
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||||
|
import io.nosqlbench.nb.annotations.Service;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@Service(value = DriverAdapter.class, selector = "s4r")
|
||||||
|
public class S4RDriverAdapter extends BaseDriverAdapter<S4RTimeTrackOp, S4RSpace> {
|
||||||
|
private final static Logger logger = LogManager.getLogger(S4RDriverAdapter.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OpMapper<S4RTimeTrackOp> getOpMapper() {
|
||||||
|
DriverSpaceCache<? extends S4RSpace> spaceCache = getSpaceCache();
|
||||||
|
NBConfiguration adapterConfig = getConfiguration();
|
||||||
|
return new S4ROpMapper(this, adapterConfig, spaceCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Function<String, ? extends S4RSpace> getSpaceInitializer(NBConfiguration cfg) {
|
||||||
|
return (s) -> new S4RSpace(s, cfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NBConfigModel getConfigModel() {
|
||||||
|
return super.getConfigModel().add(S4RSpace.getConfigModel());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,71 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r;
|
||||||
|
|
||||||
|
import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgRecvOpDispenser;
|
||||||
|
import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgSendOpDispenser;
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
|
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||||
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
|
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
public class S4ROpMapper implements OpMapper<S4RTimeTrackOp> {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(S4ROpMapper.class);
|
||||||
|
|
||||||
|
private final NBConfiguration cfg;
|
||||||
|
private final DriverSpaceCache<? extends S4RSpace> spaceCache;
|
||||||
|
private final DriverAdapter adapter;
|
||||||
|
|
||||||
|
public S4ROpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends S4RSpace> spaceCache) {
|
||||||
|
this.cfg = cfg;
|
||||||
|
this.spaceCache = spaceCache;
|
||||||
|
this.adapter = adapter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OpDispenser<? extends S4RTimeTrackOp> apply(ParsedOp op) {
|
||||||
|
String spaceName = op.getStaticConfigOr("space", "default");
|
||||||
|
S4RSpace s4RSpace = spaceCache.get(spaceName);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the user provides a body element, then they want to provide the JSON or
|
||||||
|
* a data structure that can be converted into JSON, bypassing any further
|
||||||
|
* specialized type-checking or op-type specific features
|
||||||
|
*/
|
||||||
|
if (op.isDefined("body")) {
|
||||||
|
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
TypeAndTarget<S4ROpType, String> opType = op.getTypeAndTarget(S4ROpType.class, String.class);
|
||||||
|
|
||||||
|
return switch (opType.enumId) {
|
||||||
|
case AmqpMsgSender ->
|
||||||
|
new AmqpMsgSendOpDispenser(adapter, op, s4RSpace);
|
||||||
|
case AmqpMsgReceiver ->
|
||||||
|
new AmqpMsgRecvOpDispenser(adapter, op, s4RSpace);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r;
|
||||||
|
|
||||||
|
public enum S4ROpType {
|
||||||
|
AmqpMsgSender,
|
||||||
|
AmqpMsgReceiver
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -0,0 +1,278 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import com.rabbitmq.client.ConnectionFactory;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RClientConf;
|
||||||
|
import io.nosqlbench.api.config.standard.ConfigModel;
|
||||||
|
import io.nosqlbench.api.config.standard.NBConfigModel;
|
||||||
|
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||||
|
import io.nosqlbench.api.config.standard.Param;
|
||||||
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
public class S4RSpace implements AutoCloseable {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(S4RSpace.class);
|
||||||
|
|
||||||
|
private final String spaceName;
|
||||||
|
private final NBConfiguration cfg;
|
||||||
|
|
||||||
|
private final S4RClientConf s4rClientConf;
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////
|
||||||
|
// NOTE: in this driver, we assume:
|
||||||
|
// - possible multiple connections
|
||||||
|
// - possible multiple channels per connection
|
||||||
|
// - TBD: only one exchange per channel
|
||||||
|
// - for senders, possible multiple senders per exchange
|
||||||
|
// - for receivers,
|
||||||
|
// * possible multiple queues per exchange
|
||||||
|
// * possible multiple receivers per queue
|
||||||
|
//
|
||||||
|
// Each NB thread is a single sender or receiver
|
||||||
|
//
|
||||||
|
// All senders/receivers share the same set of connections/channels/exchanges/queues
|
||||||
|
///////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
// Maximum number of AMQP connections
|
||||||
|
private final int amqpConnNum;
|
||||||
|
|
||||||
|
// Maximum number of AMQP channels per connection
|
||||||
|
private final int amqpConnChannelNum;
|
||||||
|
|
||||||
|
// Max number of queues (per exchange)
|
||||||
|
// - only relevant with message receivers
|
||||||
|
private final int amqpExchangeQueueNum;
|
||||||
|
|
||||||
|
// Max number of message clients (senders or receivers)
|
||||||
|
// - for senders, this is the number of message senders per exchange
|
||||||
|
// - for recievers, this is the number of message receivers per queue
|
||||||
|
// (there could be multiple queues per exchange)
|
||||||
|
private final int amqpMsgClntNum;
|
||||||
|
|
||||||
|
|
||||||
|
private final AtomicBoolean beingShutdown = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private ConnectionFactory s4rConnFactory;
|
||||||
|
|
||||||
|
// Default to "direct" type
|
||||||
|
private String amqpExchangeType = S4RAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label;
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<Long, Connection> amqpConnections = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
///////////////////////////////////
|
||||||
|
// NOTE: Do NOT mix sender and receiver workload in one NB workload
|
||||||
|
///////////////////////////////////
|
||||||
|
|
||||||
|
// Amqp Channels for senders
|
||||||
|
public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { }
|
||||||
|
private final ConcurrentHashMap<AmqpSenderChannelKey, Channel> amqpSenderChannels = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
// Amqp Channels for receivers
|
||||||
|
public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { }
|
||||||
|
private final ConcurrentHashMap<AmqpReceiverChannelKey, Channel> amqpReceiverChannels = new ConcurrentHashMap<>();
|
||||||
|
private final ConcurrentHashMap<AmqpReceiverChannelKey, Set<String>> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
// Whether to do strict error handling while sending/receiving messages
|
||||||
|
// - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing
|
||||||
|
// message receiving/sending will trigger NB execution stop
|
||||||
|
// - No: pause the current thread that received the error message for 1 second and then continue processing
|
||||||
|
private final boolean strictMsgErrorHandling;
|
||||||
|
|
||||||
|
// Maximum time length to execute S4R operations (e.g. message send or consume)
|
||||||
|
// - when NB execution passes this threshold, it is simply NoOp
|
||||||
|
// - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes
|
||||||
|
private final long maxOpTimeInSec;
|
||||||
|
private final long activityStartTimeMills;
|
||||||
|
|
||||||
|
private long totalCycleNum;
|
||||||
|
private long totalThreadNum;
|
||||||
|
|
||||||
|
public S4RSpace(String spaceName, NBConfiguration cfg) {
|
||||||
|
this.spaceName = spaceName;
|
||||||
|
this.cfg = cfg;
|
||||||
|
|
||||||
|
String s4rClientConfFileName = cfg.get("config");
|
||||||
|
this.s4rClientConf = new S4RClientConf(s4rClientConfFileName);
|
||||||
|
this.amqpConnNum =
|
||||||
|
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
|
||||||
|
this.amqpConnChannelNum =
|
||||||
|
NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1"));
|
||||||
|
this.amqpExchangeQueueNum =
|
||||||
|
NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1"));
|
||||||
|
this.amqpMsgClntNum =
|
||||||
|
NumberUtils.toInt(cfg.getOptional("num_msg_clnt").orElse("1"));
|
||||||
|
this.maxOpTimeInSec =
|
||||||
|
NumberUtils.toLong(cfg.getOptional("max_op_time").orElse("0L"));
|
||||||
|
this.strictMsgErrorHandling =
|
||||||
|
BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false"));
|
||||||
|
this.activityStartTimeMills = System.currentTimeMillis();
|
||||||
|
|
||||||
|
this.initializeSpace(s4rClientConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
shutdownSpace();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NBConfigModel getConfigModel() {
|
||||||
|
return ConfigModel.of(S4RSpace.class)
|
||||||
|
.add(Param.defaultTo("config", "config.properties")
|
||||||
|
.setDescription("S4R client connection configuration property file."))
|
||||||
|
.add(Param.defaultTo("num_conn", 1)
|
||||||
|
.setDescription("Maximum number of AMQP connections."))
|
||||||
|
.add(Param.defaultTo("num_channel", 1)
|
||||||
|
.setDescription("Maximum number of AMQP channels per connection"))
|
||||||
|
.add(Param.defaultTo("max_op_time", 0)
|
||||||
|
.setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
|
||||||
|
.add(Param.defaultTo("strict_msg_error_handling", false)
|
||||||
|
.setDescription("Whether to do strict error handling which is to stop NB Kafka execution."))
|
||||||
|
.asReadOnly();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); }
|
||||||
|
|
||||||
|
public Channel getAmqpSenderChannel(
|
||||||
|
AmqpSenderChannelKey key,
|
||||||
|
Supplier<Channel> channelSupplier) {
|
||||||
|
return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Channel getAmqpReceiverChannel(
|
||||||
|
AmqpReceiverChannelKey key,
|
||||||
|
Supplier<Channel> channelSupplier) {
|
||||||
|
return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
|
||||||
|
public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; }
|
||||||
|
public S4RClientConf getS4rClientConf() { return s4rClientConf; }
|
||||||
|
|
||||||
|
public String getAmqpExchangeType() { return amqpExchangeType; }
|
||||||
|
public int getAmqpConnNum() { return this.amqpConnNum; }
|
||||||
|
public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; }
|
||||||
|
public int getAmqpExchangeQueueNum() { return this.amqpConnNum; }
|
||||||
|
public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; }
|
||||||
|
|
||||||
|
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
|
||||||
|
|
||||||
|
public long getTotalCycleNum() { return totalCycleNum; }
|
||||||
|
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
|
||||||
|
|
||||||
|
public long getTotalThreadNum() { return totalThreadNum; }
|
||||||
|
public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; }
|
||||||
|
|
||||||
|
public void initializeSpace(S4RClientConf s4rClientConnInfo) {
|
||||||
|
Map<String, String> cfgMap = s4rClientConnInfo.getS4rConfMap();
|
||||||
|
|
||||||
|
if (amqpConnNum < 1) {
|
||||||
|
String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!";
|
||||||
|
throw new S4RAdapterInvalidParamException(errMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amqpConnChannelNum < 1) {
|
||||||
|
String errMsg = "AMQP channel number per connection (\"num_channel\") must be a positive number!";
|
||||||
|
throw new S4RAdapterInvalidParamException(errMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
amqpExchangeType = cfgMap.get("exchangeType");
|
||||||
|
if (!S4RAdapterUtil.AMQP_EXCHANGE_TYPES.isValidLabel(amqpExchangeType)) {
|
||||||
|
String errMsg = "Invalid AMQP exchange type: \"" + amqpExchangeType + "\". " +
|
||||||
|
"Valid values are: \"" + S4RAdapterUtil.getValidAmqpExchangeTypeList() + "\"";
|
||||||
|
throw new S4RAdapterInvalidParamException(errMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (s4rConnFactory == null) {
|
||||||
|
try {
|
||||||
|
s4rConnFactory = new ConnectionFactory();
|
||||||
|
|
||||||
|
String passWord = cfg.get("jwtToken");
|
||||||
|
s4rConnFactory.setPassword(cfgMap.get(""));
|
||||||
|
s4rConnFactory.setPassword(passWord);
|
||||||
|
|
||||||
|
String amqpServerHost = cfg.get("amqpSrvHost");
|
||||||
|
s4rConnFactory.setHost(amqpServerHost);
|
||||||
|
|
||||||
|
int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort"));
|
||||||
|
s4rConnFactory.setPort(amqpServerPort);
|
||||||
|
|
||||||
|
String amqpVirtualHost = cfg.get("virtualHost");
|
||||||
|
s4rConnFactory.setVirtualHost(amqpVirtualHost);
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < getAmqpConnNum(); i++) {
|
||||||
|
Connection connection = s4rConnFactory.newConnection();
|
||||||
|
amqpConnections.put((long) i, connection);
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("[AMQP Connection created] {} -- [{}] {}",
|
||||||
|
Thread.currentThread().getName(),
|
||||||
|
i,
|
||||||
|
connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException|TimeoutException ex) {
|
||||||
|
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
|
||||||
|
s4rClientConnInfo.toString());
|
||||||
|
throw new S4RAdapterUnexpectedException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdownSpace() {
|
||||||
|
try {
|
||||||
|
beingShutdown.set(true);
|
||||||
|
|
||||||
|
for (Channel channel : amqpSenderChannels.values()) {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Channel channel : amqpReceiverChannels.values()) {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Connection connection : amqpConnections.values()) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause 5 seconds before closing producers/consumers
|
||||||
|
S4RAdapterUtil.pauseCurThreadExec(5);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
String exp = "Unexpected error when shutting down the S4R adaptor space";
|
||||||
|
logger.error(exp, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,133 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2023 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.dispensers;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.AMQP;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackOp, S4RSpace> {
|
||||||
|
|
||||||
|
private static final Logger logger = LogManager.getLogger("AmqpBaseOpDispenser");
|
||||||
|
|
||||||
|
protected final ParsedOp parsedOp;
|
||||||
|
protected final S4RAdapterMetrics s4rAdapterMetrics;
|
||||||
|
protected final S4RSpace s4rSpace;
|
||||||
|
|
||||||
|
protected final Map<String, String> s4rConfMap = new HashMap<>();
|
||||||
|
protected final String exchangeType;
|
||||||
|
protected final LongFunction<String> exchangeNameFunc;
|
||||||
|
|
||||||
|
protected AmqpBaseOpDispenser(final DriverAdapter adapter,
|
||||||
|
final ParsedOp op,
|
||||||
|
final S4RSpace s4RSpace) {
|
||||||
|
|
||||||
|
super(adapter, op);
|
||||||
|
|
||||||
|
parsedOp = op;
|
||||||
|
this.s4rSpace = s4RSpace;
|
||||||
|
|
||||||
|
s4rAdapterMetrics = new S4RAdapterMetrics(this, this);
|
||||||
|
s4rAdapterMetrics.initS4JAdapterInstrumentation();
|
||||||
|
|
||||||
|
s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap());
|
||||||
|
|
||||||
|
this.exchangeType = s4RSpace.getAmqpExchangeType();
|
||||||
|
this.exchangeNameFunc = lookupMandtoryStrOpValueFunc("exchange_name");
|
||||||
|
|
||||||
|
s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
|
||||||
|
s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
|
||||||
|
LongFunction<String> stringLongFunction;
|
||||||
|
stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class);
|
||||||
|
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
|
||||||
|
|
||||||
|
return stringLongFunction;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
|
||||||
|
LongFunction<String> stringLongFunction;
|
||||||
|
stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class)
|
||||||
|
.orElse(l -> defaultValue);
|
||||||
|
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
|
||||||
|
|
||||||
|
return stringLongFunction;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Channel getChannelWithExchange(Connection amqpConnection,
|
||||||
|
long connSeqNum,
|
||||||
|
long channelSeqNum,
|
||||||
|
String exchangeName)
|
||||||
|
throws IOException {
|
||||||
|
Channel channel = amqpConnection.createChannel();
|
||||||
|
if (channel == null) {
|
||||||
|
throw new S4RAdapterUnexpectedException("No AMQP channel is available!");
|
||||||
|
}
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("AMQP channel created -- {} [{},{}] ",
|
||||||
|
channel,
|
||||||
|
connSeqNum,
|
||||||
|
channelSeqNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
AMQP.Exchange.DeclareOk declareOk =
|
||||||
|
channel.exchangeDeclare(exchangeName, s4rSpace.getAmqpExchangeType());
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("AMQP exchange declared -- [name: {}, type: {}] {}",
|
||||||
|
exchangeName,
|
||||||
|
exchangeType,
|
||||||
|
declareOk);
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getConnSeqNum(long cycle) {
|
||||||
|
return cycle % s4rSpace.getAmqpConnNum();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getConnChannelSeqNum(long cycle) {
|
||||||
|
return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getEffectiveExchangeName(long cycle) {
|
||||||
|
String exchangeNameInput = exchangeNameFunc.apply(cycle);
|
||||||
|
return (StringUtils.isBlank(exchangeNameInput) ? "exchange-" + getConnChannelSeqNum(cycle) : exchangeNameInput);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return "AmqpBaseOpDispenser";
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,127 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.dispensers;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.AMQP;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgRecvOp;
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser");
|
||||||
|
|
||||||
|
private final LongFunction<String> bindingKeyFunc;
|
||||||
|
private final LongFunction<String> queueNameFunc;
|
||||||
|
public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
|
||||||
|
ParsedOp op,
|
||||||
|
S4RSpace s4rSpace) {
|
||||||
|
super(adapter, op, s4rSpace);
|
||||||
|
|
||||||
|
queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null);
|
||||||
|
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getExchangeQueueSeqNum(long cycle) {
|
||||||
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum()))
|
||||||
|
% s4rSpace.getAmqpExchangeQueueNum();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getQueueReceiverSeqNum(long cycle) {
|
||||||
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum()))
|
||||||
|
% s4rSpace.getAmqpMsgClntNum();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getEffectiveQueueName(long cycle) {
|
||||||
|
String queueNameInput = queueNameFunc.apply(cycle);
|
||||||
|
return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Channel getAmqpChannelQueueForReceiver(long cycle,
|
||||||
|
String exchangeName,
|
||||||
|
String queueName) {
|
||||||
|
long connSeqNum = getConnSeqNum(cycle);
|
||||||
|
long channelSeqNum = getConnChannelSeqNum(cycle);
|
||||||
|
long queueSeqNum = getExchangeQueueSeqNum(cycle);
|
||||||
|
long receiverSeqNum = getQueueReceiverSeqNum(cycle);
|
||||||
|
|
||||||
|
Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum);
|
||||||
|
|
||||||
|
S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey =
|
||||||
|
new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum);
|
||||||
|
|
||||||
|
return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> {
|
||||||
|
Channel channel = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel = getChannelWithExchange(
|
||||||
|
amqpConnection,
|
||||||
|
connSeqNum,
|
||||||
|
channelSeqNum,
|
||||||
|
exchangeName);
|
||||||
|
|
||||||
|
AMQP.Queue.DeclareOk declareOk =
|
||||||
|
channel.queueDeclare(queueName, true, true, true, null);
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}",
|
||||||
|
exchangeName,
|
||||||
|
queueName,
|
||||||
|
declareOk);
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S4RTimeTrackOp apply(long cycle) {
|
||||||
|
Channel channel = null;
|
||||||
|
|
||||||
|
String exchangeName = getEffectiveExchangeName(cycle);
|
||||||
|
String queueName = getEffectiveQueueName(cycle);
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new OpTimeTrackAmqpMsgRecvOp(
|
||||||
|
s4rAdapterMetrics,
|
||||||
|
s4rSpace,
|
||||||
|
channel,
|
||||||
|
exchangeName,
|
||||||
|
queueName,
|
||||||
|
bindingKeyFunc.apply(cycle));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,180 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.dispensers;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.AMQP;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.rabbitmq.client.Connection;
|
||||||
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgSendOp;
|
||||||
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger("AmqpMsgSendOpDispenser");
|
||||||
|
|
||||||
|
private boolean publisherConfirm ;
|
||||||
|
// Only relevant when 'publisherConfirm' is true
|
||||||
|
// - default to "individual" confirm
|
||||||
|
private String confirmMode;
|
||||||
|
|
||||||
|
// Only relevant when 'publisherConfirm' is true and 'confirmMode' is 'batch'
|
||||||
|
// - default to 100
|
||||||
|
private int confirmBatchNum;
|
||||||
|
|
||||||
|
private final LongFunction<String> routingKeyFunc;
|
||||||
|
private final LongFunction<String> msgPayloadFunc;
|
||||||
|
|
||||||
|
public AmqpMsgSendOpDispenser(DriverAdapter adapter,
|
||||||
|
ParsedOp op,
|
||||||
|
S4RSpace s4rSpace) {
|
||||||
|
super(adapter, op, s4rSpace);
|
||||||
|
|
||||||
|
publisherConfirm = parsedOp
|
||||||
|
.getOptionalStaticConfig("publisher_confirm", String.class)
|
||||||
|
.filter(Predicate.not(String::isEmpty))
|
||||||
|
.map(BooleanUtils::toBoolean)
|
||||||
|
.orElse(false);
|
||||||
|
|
||||||
|
confirmMode = parsedOp
|
||||||
|
.getOptionalStaticValue("confirm_mode", String.class)
|
||||||
|
.orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
|
||||||
|
if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
|
||||||
|
throw new S4RAdapterInvalidParamException("confirm_mode",
|
||||||
|
"Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
confirmBatchNum = parsedOp
|
||||||
|
.getOptionalStaticConfig("confirm_batch_num", String.class)
|
||||||
|
.filter(Predicate.not(String::isEmpty))
|
||||||
|
.map(NumberUtils::toInt)
|
||||||
|
.orElse(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM);
|
||||||
|
if (confirmBatchNum < S4RAdapterUtil.AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN) {
|
||||||
|
confirmBatchNum = S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM;
|
||||||
|
}
|
||||||
|
|
||||||
|
routingKeyFunc = lookupOptionalStrOpValueFunc("routing_key", null);
|
||||||
|
|
||||||
|
msgPayloadFunc = lookupMandtoryStrOpValueFunc("message");
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getExchangeSenderSeqNum(long cycle) {
|
||||||
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum()))
|
||||||
|
% s4rSpace.getAmqpMsgClntNum();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Channel getAmqpChannelForSender(long cycle,
|
||||||
|
String exchangeName) {
|
||||||
|
long connSeqNum = getConnSeqNum(cycle);
|
||||||
|
long channelSeqNum = getConnChannelSeqNum(cycle);
|
||||||
|
long senderSeqNum = getExchangeSenderSeqNum(cycle);
|
||||||
|
|
||||||
|
Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum);
|
||||||
|
|
||||||
|
S4RSpace.AmqpSenderChannelKey amqpConnChannelKey =
|
||||||
|
new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum);
|
||||||
|
|
||||||
|
return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> {
|
||||||
|
Channel channel = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel = getChannelWithExchange(
|
||||||
|
amqpConnection,
|
||||||
|
connSeqNum,
|
||||||
|
channelSeqNum,
|
||||||
|
exchangeName);
|
||||||
|
|
||||||
|
if (publisherConfirm) {
|
||||||
|
channel.confirmSelect();
|
||||||
|
|
||||||
|
boolean asyncConfirm = false;
|
||||||
|
if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
|
||||||
|
asyncConfirm = true;
|
||||||
|
|
||||||
|
channel.addConfirmListener((sequenceNumber, multiple) -> {
|
||||||
|
// code when message is confirmed
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("Async ack of message publish received: {}, {}",
|
||||||
|
sequenceNumber, multiple);
|
||||||
|
}
|
||||||
|
}, (sequenceNumber, multiple) -> {
|
||||||
|
// code when message is nack-ed
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("Async n-ack of message publish received: {}, {}",
|
||||||
|
sequenceNumber, multiple);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}",
|
||||||
|
!asyncConfirm,
|
||||||
|
channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S4RTimeTrackOp apply(long cycle) {
|
||||||
|
String msgPayload = msgPayloadFunc.apply(cycle);
|
||||||
|
if (StringUtils.isBlank(msgPayload)) {
|
||||||
|
throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!");
|
||||||
|
}
|
||||||
|
|
||||||
|
Channel channel = null;
|
||||||
|
String exchangeName = getEffectiveExchangeName(cycle);
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel = getAmqpChannelForSender(cycle, exchangeName);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new OpTimeTrackAmqpMsgSendOp(
|
||||||
|
s4rAdapterMetrics,
|
||||||
|
s4rSpace,
|
||||||
|
channel,
|
||||||
|
exchangeName,
|
||||||
|
msgPayload,
|
||||||
|
routingKeyFunc.apply(cycle),
|
||||||
|
publisherConfirm,
|
||||||
|
confirmMode,
|
||||||
|
confirmBatchNum);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.exception;
|
||||||
|
|
||||||
|
public class S4RAdapterInvalidParamException extends RuntimeException {
|
||||||
|
|
||||||
|
public S4RAdapterInvalidParamException(String paramName, String errDesc) {
|
||||||
|
super("Invalid setting for parameter (" + paramName + "): " + errDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public S4RAdapterInvalidParamException(String fullErrDesc) {
|
||||||
|
super(fullErrDesc);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.exception;
|
||||||
|
|
||||||
|
public class S4RAdapterUnexpectedException extends RuntimeException {
|
||||||
|
|
||||||
|
public S4RAdapterUnexpectedException(String message) {
|
||||||
|
super(message);
|
||||||
|
printStackTrace();
|
||||||
|
}
|
||||||
|
public S4RAdapterUnexpectedException(Exception e) {
|
||||||
|
super(e);
|
||||||
|
printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022-2023 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.exception;
|
||||||
|
|
||||||
|
public class S4RAdapterUnsupportedOpException extends RuntimeException {
|
||||||
|
|
||||||
|
public S4RAdapterUnsupportedOpException(final String kafkaOpType) {
|
||||||
|
super("Unsupported Kafka adapter operation type: \"" + kafkaOpType + '"');
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,82 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.ops;
|
||||||
|
|
||||||
|
|
||||||
|
import com.rabbitmq.client.*;
|
||||||
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
|
||||||
|
public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp");
|
||||||
|
|
||||||
|
private final String queueName;
|
||||||
|
private final String bindingKey;
|
||||||
|
|
||||||
|
public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics,
|
||||||
|
S4RSpace s4rSpace,
|
||||||
|
Channel channel,
|
||||||
|
String exchangeName,
|
||||||
|
String queueName,
|
||||||
|
String bindingKey) {
|
||||||
|
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName);
|
||||||
|
this.queueName = queueName;
|
||||||
|
this.bindingKey = bindingKey;
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel.queueBind(queueName, exchangeName, bindingKey);
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " +
|
||||||
|
"exchange (\"" + exchangeName + "\")!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void cycleMsgProcess(long cycle, Object cycleObj) {
|
||||||
|
try {
|
||||||
|
Consumer receiver = new DefaultConsumer(channel) {
|
||||||
|
@Override
|
||||||
|
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
|
||||||
|
byte[] body) throws IOException {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
String msgPayload = new String(body, StandardCharsets.UTF_8);
|
||||||
|
logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}",
|
||||||
|
msgPayload,
|
||||||
|
consumerTag,
|
||||||
|
channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
channel.basicConsume(queueName, receiver);
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
"Failed to receive message via the current channel: " + channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,120 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.ops;
|
||||||
|
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
|
||||||
|
public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgSendOp");
|
||||||
|
|
||||||
|
private final String routingKey;
|
||||||
|
private final boolean publishConfirm;
|
||||||
|
private final String confirmMode;
|
||||||
|
private final int confirmBatchNum;
|
||||||
|
|
||||||
|
private static final ThreadLocal<Integer>
|
||||||
|
publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
|
||||||
|
|
||||||
|
public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics,
|
||||||
|
S4RSpace s4rSpace,
|
||||||
|
Channel channel,
|
||||||
|
String exchangeName,
|
||||||
|
String message,
|
||||||
|
String routingKey,
|
||||||
|
boolean publishConfirm,
|
||||||
|
String confirmMode,
|
||||||
|
int confirmBatchNum) {
|
||||||
|
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName);
|
||||||
|
this.cycleObj = message;
|
||||||
|
this.routingKey = routingKey;
|
||||||
|
this.publishConfirm = publishConfirm;
|
||||||
|
this.confirmMode = confirmMode;
|
||||||
|
this.confirmBatchNum = confirmBatchNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void cycleMsgProcess(long cycle, Object cycleObj) {
|
||||||
|
assert (cycleObj != null);
|
||||||
|
assert (cycleObj.getClass().equals(String.class));
|
||||||
|
|
||||||
|
String msgPayload = (String) cycleObj;
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel.basicPublish(
|
||||||
|
exchangeName,
|
||||||
|
routingKey,
|
||||||
|
null,
|
||||||
|
msgPayload.getBytes(StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
if (publishConfirm) {
|
||||||
|
// Individual publish confirm
|
||||||
|
if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
|
||||||
|
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
// Batch publish confirm
|
||||||
|
else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
|
||||||
|
int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get();
|
||||||
|
if ( (publishConfirmTrackingCnt > 0) &&
|
||||||
|
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
|
||||||
|
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) {
|
||||||
|
synchronized (this) {
|
||||||
|
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Async publish confirm
|
||||||
|
// - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser'
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Successfully published message ({}) via the current channel: {}",
|
||||||
|
msgPayload, channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IllegalStateException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
"Wait for confirm on a wrong non-confirm channel: " + channel);
|
||||||
|
}
|
||||||
|
catch (InterruptedException | TimeoutException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
"Failed to wait for the ack of the published message (" + msgPayload
|
||||||
|
+ ") via the current channel: " + channel);
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
"Failed to publish message (" + msgPayload
|
||||||
|
+ ") via the current channel: " + channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,70 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.ops;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
|
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public abstract class S4RTimeTrackOp implements CycleOp<Object> {
|
||||||
|
private final S4RAdapterMetrics s4rAdapterMetrics;
|
||||||
|
protected final S4RSpace s4RSpace;
|
||||||
|
protected final Channel channel;
|
||||||
|
protected final String exchangeName;
|
||||||
|
|
||||||
|
// Maximum time length to execute Kafka operations (e.g. message send or consume)
|
||||||
|
// - when NB execution passes this threshold, it is simply NoOp
|
||||||
|
// - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes
|
||||||
|
protected final long maxOpTimeInSec;
|
||||||
|
|
||||||
|
protected final long activityStartTime;
|
||||||
|
|
||||||
|
protected Object cycleObj;
|
||||||
|
|
||||||
|
public S4RTimeTrackOp(S4RAdapterMetrics s4rAdapterMetrics,
|
||||||
|
S4RSpace s4RSpace,
|
||||||
|
Channel channel,
|
||||||
|
String exchangeName)
|
||||||
|
{
|
||||||
|
this.s4rAdapterMetrics = s4rAdapterMetrics;
|
||||||
|
this.s4RSpace = s4RSpace;
|
||||||
|
this.channel = channel;
|
||||||
|
this.exchangeName = exchangeName;
|
||||||
|
this.activityStartTime = s4RSpace.getActivityStartTimeMills();
|
||||||
|
this.maxOpTimeInSec = s4RSpace.getMaxOpTimeInSec();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object apply(long cycle) {
|
||||||
|
long timeElapsedMills = System.currentTimeMillis() - activityStartTime;
|
||||||
|
|
||||||
|
// If maximum operation duration is specified, only process messages
|
||||||
|
// before the maximum duration threshold is reached. Otherwise, this is
|
||||||
|
// just no-op.
|
||||||
|
if ( (maxOpTimeInSec == 0) || (timeElapsedMills <= (maxOpTimeInSec*1000)) ) {
|
||||||
|
cycleMsgProcess(cycle, cycleObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract void cycleMsgProcess(long cycle, Object cycleObj);
|
||||||
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022-2023 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.util;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Counter;
|
||||||
|
import com.codahale.metrics.Histogram;
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import io.nosqlbench.adapter.s4r.dispensers.AmqpBaseOpDispenser;
|
||||||
|
import io.nosqlbench.api.config.NBLabeledElement;
|
||||||
|
import io.nosqlbench.api.config.NBLabels;
|
||||||
|
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
public class S4RAdapterMetrics {
|
||||||
|
|
||||||
|
private static final Logger logger = LogManager.getLogger("S4JAdapterMetrics");
|
||||||
|
private final NBLabels labels;
|
||||||
|
|
||||||
|
private Histogram messageSizeHistogram;
|
||||||
|
private Timer bindTimer;
|
||||||
|
private Timer executeTimer;
|
||||||
|
// - message out of sequence error counter
|
||||||
|
private Counter msgErrOutOfSeqCounter;
|
||||||
|
// - message loss counter
|
||||||
|
private Counter msgErrLossCounter;
|
||||||
|
// - message duplicate error counter
|
||||||
|
private Counter msgErrDuplicateCounter;
|
||||||
|
|
||||||
|
public Histogram getE2eMsgProcLatencyHistogram() {
|
||||||
|
return this.e2eMsgProcLatencyHistogram;
|
||||||
|
}
|
||||||
|
|
||||||
|
// end-to-end latency
|
||||||
|
private Histogram e2eMsgProcLatencyHistogram;
|
||||||
|
private final AmqpBaseOpDispenser s4rBaseOpDispenser;
|
||||||
|
|
||||||
|
public S4RAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) {
|
||||||
|
this.s4rBaseOpDispenser = s4rBaseOpDispenser;
|
||||||
|
labels=labeledParent.getLabels().and("name", S4RAdapterMetrics.class.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initS4JAdapterInstrumentation() {
|
||||||
|
// Histogram metrics
|
||||||
|
messageSizeHistogram =
|
||||||
|
ActivityMetrics.histogram(this.s4rBaseOpDispenser,
|
||||||
|
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||||
|
|
||||||
|
// Timer metrics
|
||||||
|
bindTimer =
|
||||||
|
ActivityMetrics.timer(this.s4rBaseOpDispenser,
|
||||||
|
"bind", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||||
|
executeTimer =
|
||||||
|
ActivityMetrics.timer(this.s4rBaseOpDispenser,
|
||||||
|
"execute", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||||
|
|
||||||
|
// End-to-end metrics
|
||||||
|
// Latency
|
||||||
|
e2eMsgProcLatencyHistogram =
|
||||||
|
ActivityMetrics.histogram(this.s4rBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||||
|
// Error metrics
|
||||||
|
msgErrOutOfSeqCounter =
|
||||||
|
ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_oos");
|
||||||
|
msgErrLossCounter =
|
||||||
|
ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_loss");
|
||||||
|
msgErrDuplicateCounter =
|
||||||
|
ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_dup");
|
||||||
|
}
|
||||||
|
|
||||||
|
public Timer getBindTimer() { return bindTimer; }
|
||||||
|
public Timer getExecuteTimer() { return executeTimer; }
|
||||||
|
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
|
||||||
|
|
||||||
|
public Counter getMsgErrOutOfSeqCounter() {
|
||||||
|
return msgErrOutOfSeqCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMsgErrOutOfSeqCounter(Counter msgErrOutOfSeqCounter) {
|
||||||
|
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Counter getMsgErrLossCounter() {
|
||||||
|
return msgErrLossCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMsgErrLossCounter(Counter msgErrLossCounter) {
|
||||||
|
this.msgErrLossCounter = msgErrLossCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Counter getMsgErrDuplicateCounter() {
|
||||||
|
return msgErrDuplicateCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMsgErrDuplicateCounter(Counter msgErrDuplicateCounter) {
|
||||||
|
this.msgErrDuplicateCounter = msgErrDuplicateCounter;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,114 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2023 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.util;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class S4RAdapterUtil {
|
||||||
|
private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class);
|
||||||
|
|
||||||
|
///////
|
||||||
|
// Valid document level parameters for JMS NB yaml file
|
||||||
|
public enum DOC_LEVEL_PARAMS {
|
||||||
|
// Blocking message producing or consuming
|
||||||
|
ASYNC_API("async_api");
|
||||||
|
public final String label;
|
||||||
|
|
||||||
|
DOC_LEVEL_PARAMS(final String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum AMQP_EXCHANGE_TYPES {
|
||||||
|
DIRECT("direct"),
|
||||||
|
FANOUT("fanout"),
|
||||||
|
TOPIC("topic"),
|
||||||
|
HEADERS("headers");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
AMQP_EXCHANGE_TYPES(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
|
||||||
|
public static boolean isValidLabel(String label) {
|
||||||
|
return LABELS.contains(label);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static String getValidAmqpExchangeTypeList() {
|
||||||
|
return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", ");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public enum AMQP_PUB_CONFIRM_MODE {
|
||||||
|
INDIVIDUAL("individual"),
|
||||||
|
BATCH("batch"),
|
||||||
|
ASYNC("async");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
AMQP_PUB_CONFIRM_MODE(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
|
||||||
|
public static boolean isValidLabel(String label) {
|
||||||
|
return LABELS.contains(label);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static String getValidAmqpPublisherConfirmModeList() {
|
||||||
|
return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", ");
|
||||||
|
}
|
||||||
|
|
||||||
|
// At least 20 messages in a publishing batch
|
||||||
|
public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20;
|
||||||
|
public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100;
|
||||||
|
public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000;
|
||||||
|
|
||||||
|
public static Map<String, String> convertJsonToMap(final String jsonStr) throws Exception {
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.readValue(jsonStr, new TypeReference<Map<String, String>>(){});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void pauseCurThreadExec(final int pauseInSec) {
|
||||||
|
if (0 < pauseInSec) try {
|
||||||
|
Thread.sleep(pauseInSec * 1000L);
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
ie.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void messageErrorHandling(final Exception exception, final boolean strictErrorHandling, final String errorMsg) {
|
||||||
|
exception.printStackTrace();
|
||||||
|
|
||||||
|
if (strictErrorHandling) throw new RuntimeException(errorMsg + " [ " + exception.getMessage() + " ]");
|
||||||
|
pauseCurThreadExec(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022-2023 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.s4r.util;
|
||||||
|
|
||||||
|
import org.apache.commons.configuration2.Configuration;
|
||||||
|
import org.apache.commons.configuration2.FileBasedConfiguration;
|
||||||
|
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||||
|
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||||
|
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||||
|
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class S4RClientConf {
|
||||||
|
private static final Logger logger = LogManager.getLogger(S4RClientConf.class);
|
||||||
|
|
||||||
|
// https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html
|
||||||
|
private final Map<String, String> s4rConfMap = new HashMap<>();
|
||||||
|
|
||||||
|
|
||||||
|
public S4RClientConf(final String clientConfFileName) {
|
||||||
|
|
||||||
|
//////////////////
|
||||||
|
// Read related S4R client configuration settings from a file
|
||||||
|
this.readRawConfFromFile(clientConfFileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readRawConfFromFile(final String fileName) {
|
||||||
|
final File file = new File(fileName);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final String canonicalFilePath = file.getCanonicalPath();
|
||||||
|
|
||||||
|
final Parameters params = new Parameters();
|
||||||
|
|
||||||
|
final FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
|
||||||
|
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
|
||||||
|
.configure(params.properties()
|
||||||
|
.setFileName(fileName));
|
||||||
|
|
||||||
|
final Configuration config = builder.getConfiguration();
|
||||||
|
|
||||||
|
for (final Iterator<String> it = config.getKeys(); it.hasNext(); ) {
|
||||||
|
final String confKey = it.next();
|
||||||
|
final String confVal = config.getProperty(confKey).toString();
|
||||||
|
|
||||||
|
// Get client connection specific configuration settings, removing "topic." prefix
|
||||||
|
if (!StringUtils.isBlank(confVal))
|
||||||
|
this.s4rConfMap.put(confKey, confVal);
|
||||||
|
}
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
S4RClientConf.logger.error("Can't read the specified config properties file: {}", fileName);
|
||||||
|
ioe.printStackTrace();
|
||||||
|
} catch (final ConfigurationException cex) {
|
||||||
|
S4RClientConf.logger.error("Error loading configuration items from the specified config properties file: {}:{}", fileName, cex.getMessage());
|
||||||
|
cex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, String> getS4rConfMap() { return this.s4rConfMap; }
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return new ToStringBuilder(this).
|
||||||
|
append("s4rConfMap", this.s4rConfMap).
|
||||||
|
toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
62
adapter-s4r/src/main/resources/README.md
Normal file
62
adapter-s4r/src/main/resources/README.md
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
# Overview
|
||||||
|
|
||||||
|
This NB Kafka adapter allows publishing messages to or consuming messages from
|
||||||
|
* a Kafka cluster, or
|
||||||
|
* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar.
|
||||||
|
|
||||||
|
At high level, this adapter supports the following Kafka functionalities
|
||||||
|
* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
|
||||||
|
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits)
|
||||||
|
* auto message commit
|
||||||
|
* manual message commit with a configurable number of message commits in one batch
|
||||||
|
* Kafka Transaction support
|
||||||
|
|
||||||
|
## Example NB Yaml
|
||||||
|
* [kafka_producer.yaml](./s4r_producer.yaml)
|
||||||
|
*
|
||||||
|
* [kafka_consumer.yaml](./s4r_consumer.yaml)
|
||||||
|
|
||||||
|
# Usage
|
||||||
|
|
||||||
|
```bash
|
||||||
|
## Kafka Producer
|
||||||
|
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092
|
||||||
|
|
||||||
|
## Kafka Consumer
|
||||||
|
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092
|
||||||
|
```
|
||||||
|
|
||||||
|
## NB Kafka adapter specific CLI parameters
|
||||||
|
|
||||||
|
* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from
|
||||||
|
* For producer workload, this is the number of the producer threads to publish messages to the same topic
|
||||||
|
* Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe)
|
||||||
|
* `threads` and `num_clnt` values MUST be the same.
|
||||||
|
* For consumer workload, this is the partition number of a topic
|
||||||
|
* Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number.
|
||||||
|
* Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe)
|
||||||
|
* `threads` MUST be equal to `num_clnt`*`num_cons_grp`
|
||||||
|
|
||||||
|
* `num_cons_grp`: the number of consumer groups
|
||||||
|
* Only relevant for consumer workload
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
|
||||||
|
|
||||||
|
* `async_api` (boolean):
|
||||||
|
* When true, use async Kafka client API.
|
||||||
|
* `seq_tracking` (boolean):
|
||||||
|
* When true, a sequence number is created as part of each message's properties
|
||||||
|
* This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully.
|
||||||
|
* `seqerr_simu`:
|
||||||
|
* A list of error simulation types separated by comma (,)
|
||||||
|
* Valid error simulation types
|
||||||
|
* `out_of_order`: simulate message out of sequence
|
||||||
|
* `msg_loss`: simulate message loss
|
||||||
|
* `msg_dup`: simulate message duplication
|
||||||
|
* This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments.
|
||||||
|
* `e2e_starting_time_source`:
|
||||||
|
* Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch.
|
||||||
|
* The possible values for `e2e_starting_time_source`:
|
||||||
|
* `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type).
|
||||||
24
adapter-s4r/src/main/resources/build-nb-s4r-driver.sh
Executable file
24
adapter-s4r/src/main/resources/build-nb-s4r-driver.sh
Executable file
@@ -0,0 +1,24 @@
|
|||||||
|
#!/usr/local/bin/bash
|
||||||
|
#
|
||||||
|
# Copyright (c) 2023 nosqlbench
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
: "${SKIP_TESTS:=1}"
|
||||||
|
(
|
||||||
|
cd "$(git rev-parse --show-toplevel)" && \
|
||||||
|
mvn clean install "-DskipTests" -pl adapters-api,adapter-s4r,nb5 && \
|
||||||
|
[[ ${SKIP_TESTS} -ne 1 ]] && \
|
||||||
|
mvn test -pl adapters-api,adapter-s4r
|
||||||
|
)
|
||||||
5
adapter-s4r/src/main/resources/csv/binding_keys.csv
Normal file
5
adapter-s4r/src/main/resources/csv/binding_keys.csv
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
name
|
||||||
|
alpha
|
||||||
|
beta
|
||||||
|
gamma
|
||||||
|
delta
|
||||||
|
4
adapter-s4r/src/main/resources/csv/exchange_names.csv
Normal file
4
adapter-s4r/src/main/resources/csv/exchange_names.csv
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
name
|
||||||
|
usa
|
||||||
|
canada
|
||||||
|
german
|
||||||
|
3
adapter-s4r/src/main/resources/csv/queue_names.csv
Normal file
3
adapter-s4r/src/main/resources/csv/queue_names.csv
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
queue1
|
||||||
|
queue2
|
||||||
|
queue3
|
||||||
|
3
adapter-s4r/src/main/resources/csv/routing_keys.csv
Normal file
3
adapter-s4r/src/main/resources/csv/routing_keys.csv
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
name
|
||||||
|
alpha
|
||||||
|
delta
|
||||||
|
24
adapter-s4r/src/main/resources/s4r_config.properties
Normal file
24
adapter-s4r/src/main/resources/s4r_config.properties
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2023 nosqlbench
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
##
|
||||||
|
# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled
|
||||||
|
amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com
|
||||||
|
amqpSrvPort=5671
|
||||||
|
virtualHost=<as_tenant_name>/rabbitmq
|
||||||
|
jwtToken=<jwt_token_value>
|
||||||
|
# valid values: direct, fanout, topic, headers
|
||||||
|
exchangeType=direct
|
||||||
18
adapter-s4r/src/main/resources/s4r_consumer.yaml
Normal file
18
adapter-s4r/src/main/resources/s4r_consumer.yaml
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
bindings:
|
||||||
|
myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name')
|
||||||
|
myqueue: CSVFrequencySampler('csv/queue_names.csv', 'name')
|
||||||
|
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
|
||||||
|
|
||||||
|
# Doc-level parameters (must be static)
|
||||||
|
params:
|
||||||
|
|
||||||
|
blocks:
|
||||||
|
msg-recv-block:
|
||||||
|
ops:
|
||||||
|
AmqpMsgReceiver:
|
||||||
|
#exchange_names: "{myexname}"
|
||||||
|
exchange_name: "alpha"
|
||||||
|
|
||||||
|
queue_name: "{myqueue}"
|
||||||
|
|
||||||
|
binding_key: "{myroutingkey}"
|
||||||
39
adapter-s4r/src/main/resources/s4r_producer.yaml
Normal file
39
adapter-s4r/src/main/resources/s4r_producer.yaml
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
bindings:
|
||||||
|
mytext_val: AlphaNumericString(100)
|
||||||
|
myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name')
|
||||||
|
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
|
||||||
|
|
||||||
|
|
||||||
|
# Doc-level parameters (must be static)
|
||||||
|
params:
|
||||||
|
# whether to do publisher confirm (for reliable publishing)
|
||||||
|
# - default: false
|
||||||
|
publisher_confirm: "false"
|
||||||
|
#publisher_confirm: "true"
|
||||||
|
# If 'publisher_confirm' is true, use one of the following 3 confirm modes:
|
||||||
|
# - individual (wait_for_confirm individually)
|
||||||
|
# - batch (wait_for_confirm in batch)
|
||||||
|
# - async [default]
|
||||||
|
confirm_mode: "aysnc"
|
||||||
|
#confirm_mode: "individual"
|
||||||
|
#confirm_mode: "batch"
|
||||||
|
|
||||||
|
# Only relevant when 'publisher_confirm' is true and 'confirm_mode' is "batch"
|
||||||
|
confirm_batch_num: 100
|
||||||
|
# default timeout value (in milliseconds)
|
||||||
|
# - only relevant when publisher_confirm' is true and 'confirm_mode' is NOT "async"
|
||||||
|
dft_confirm_timeout_ms: 1000
|
||||||
|
|
||||||
|
|
||||||
|
blocks:
|
||||||
|
msg-send-block:
|
||||||
|
ops:
|
||||||
|
AmqpMsgSender:
|
||||||
|
#exchange_names: "{myexname}"
|
||||||
|
exchange_names: "alpha"
|
||||||
|
|
||||||
|
routing_key: "{myroutingkey}"
|
||||||
|
|
||||||
|
## (Optional) Kafka message value.
|
||||||
|
# - message key and value can't be both empty at the same time
|
||||||
|
message: "{mytext_val}"
|
||||||
36
adapter-s4r/src/main/resources/start_s4r_consumer.sh
Executable file
36
adapter-s4r/src/main/resources/start_s4r_consumer.sh
Executable file
@@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/local/bin/bash
|
||||||
|
#
|
||||||
|
# Copyright (c) 2023 nosqlbench
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
: "${REBUILD:=1}"
|
||||||
|
: "${CYCLES:=1000000000}"
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
|
||||||
|
if [[ ${REBUILD} -eq 1 ]]; then
|
||||||
|
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
|
||||||
|
fi
|
||||||
|
java -jar nb5/target/nb5.jar \
|
||||||
|
run \
|
||||||
|
driver=s4r \
|
||||||
|
-vv \
|
||||||
|
--report-interval 5 \
|
||||||
|
--docker-metrics \
|
||||||
|
cycles=${CYCLES} \
|
||||||
|
threads=1 \
|
||||||
|
num_clnt=1 \
|
||||||
|
num_cons_grp=1 \
|
||||||
|
yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \
|
||||||
|
config="${SCRIPT_DIR}/kafka_config.properties" \
|
||||||
|
bootstrap_server=PLAINTEXT://localhost:9092
|
||||||
38
adapter-s4r/src/main/resources/start_s4r_producer.sh
Executable file
38
adapter-s4r/src/main/resources/start_s4r_producer.sh
Executable file
@@ -0,0 +1,38 @@
|
|||||||
|
#!/usr/local/bin/bash
|
||||||
|
#
|
||||||
|
# Copyright (c) 2023 nosqlbench
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
: "${REBUILD:=1}"
|
||||||
|
: "${CYCLES:=1000000000}"
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
|
||||||
|
if [[ ${REBUILD} -eq 1 ]]; then
|
||||||
|
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
|
||||||
|
fi
|
||||||
|
while [[ 1 -eq 1 ]]; do
|
||||||
|
java -jar nb5/target/nb5.jar \
|
||||||
|
run \
|
||||||
|
driver=s4r \
|
||||||
|
-vv \
|
||||||
|
--report-interval 5 \
|
||||||
|
--docker-metrics \
|
||||||
|
cycles="${CYCLES}" \
|
||||||
|
threads=1 \
|
||||||
|
num_clnt=1 \
|
||||||
|
yaml="${SCRIPT_DIR}/kafka_producer.yaml" \
|
||||||
|
config="${SCRIPT_DIR}/kafka_config.properties" \
|
||||||
|
bootstrap_server=PLAINTEXT://localhost:9092
|
||||||
|
sleep 10
|
||||||
|
done
|
||||||
@@ -113,6 +113,12 @@
|
|||||||
<version>${revision}</version>
|
<version>${revision}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<artifactId>adapter-s4r</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.nosqlbench</groupId>
|
<groupId>io.nosqlbench</groupId>
|
||||||
<artifactId>adapter-jdbc</artifactId>
|
<artifactId>adapter-jdbc</artifactId>
|
||||||
|
|||||||
1
pom.xml
1
pom.xml
@@ -65,6 +65,7 @@
|
|||||||
<module.adapter-pulsar>adapter-pulsar</module.adapter-pulsar>
|
<module.adapter-pulsar>adapter-pulsar</module.adapter-pulsar>
|
||||||
<module.adapter-s4j>adapter-s4j</module.adapter-s4j>
|
<module.adapter-s4j>adapter-s4j</module.adapter-s4j>
|
||||||
<module.adapter-kafka>adapter-kafka</module.adapter-kafka>
|
<module.adapter-kafka>adapter-kafka</module.adapter-kafka>
|
||||||
|
<module.adapter-kafka>adapter-s4r</module.adapter-kafka>
|
||||||
<module.adapter-jdbc>adapter-jdbc</module.adapter-jdbc>
|
<module.adapter-jdbc>adapter-jdbc</module.adapter-jdbc>
|
||||||
|
|
||||||
<!-- VIRTDATA MODULES -->
|
<!-- VIRTDATA MODULES -->
|
||||||
|
|||||||
Reference in New Issue
Block a user