Merge branch 'main' of github.com:nosqlbench/nosqlbench

This commit is contained in:
Jonathan Shook 2023-02-28 16:52:52 -06:00
commit e76144d3f9
15 changed files with 262 additions and 69 deletions

View File

@ -65,7 +65,7 @@ jobs:
- name: build preview revision
run: |
mvn clean verify -Drevision="${{ env.PREVIEW_VERSION }}"
mvn clean verify -Drevision="${{ env.PREVIEW_VERSION }}" -P enable-container-tests
- name: Setup docker buildx
uses: docker/setup-buildx-action@v2.2.1

View File

@ -62,7 +62,7 @@ jobs:
- name: build release revision
run: |
mvn clean verify -Drevision="${{ env.RELEASE_VERSION }}"
mvn clean package -Drevision="${{ env.RELEASE_VERSION }}" -P enable-container-tests
- name: Setup docker buildx
uses: docker/setup-buildx-action@v2.2.1

View File

@ -22,11 +22,11 @@ import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -35,9 +35,9 @@ import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Predicate;
public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> {
public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> implements NBNamedElement {
private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
private final static Logger logger = LogManager.getLogger("KafkaBaseOpDispenser");
protected final ParsedOp parsedOp;
protected final KafkaAdapterMetrics kafkaAdapterMetrics;
@ -69,7 +69,7 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
this.kafkaSpace = kafkaSpace;
String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
this.kafkaAdapterMetrics = new KafkaAdapterMetrics(defaultMetricsPrefix);
this.kafkaAdapterMetrics = new KafkaAdapterMetrics(this, defaultMetricsPrefix);
kafkaAdapterMetrics.initS4JAdapterInstrumentation();
this.asyncAPI =
@ -132,4 +132,9 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
return stringLongFunction;
}
@Override
public String getName() {
return "KafkaBaseOpDispenser";
}
}

View File

@ -21,6 +21,7 @@ import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
@ -49,6 +50,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
// - This is only relevant when the effective setting (global level and statement level)
// of "enable.auto.commit" is false
protected final int maxMsgCntPerCommit;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
protected boolean autoCommitEnabled;
@ -76,6 +78,8 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
this.autoCommitEnabled = BooleanUtils.toBoolean(consumerClientConfMap.get("enable.auto.commit"));
}
}
this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none");
}
private String getEffectiveGroupId(long cycle) {
@ -119,7 +123,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
}
opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
kafkaSpace, asyncAPI, msgPollIntervalInSec, autoCommitEnabled, maxMsgCntPerCommit, consumer);
kafkaSpace,
asyncAPI,
msgPollIntervalInSec,
autoCommitEnabled,
maxMsgCntPerCommit,
consumer,
EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
kafkaAdapterMetrics
);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}

View File

@ -17,7 +17,10 @@
package io.nosqlbench.adapter.kafka.ops;
import com.codahale.metrics.Histogram;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
@ -30,7 +33,7 @@ import java.util.Map;
public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer");
private final EndToEndStartingTimeSource e2eStartingTimeSrc;
private final int msgPoolIntervalInMs;
private final boolean asyncMsgCommit;
private final boolean autoCommitEnabled;
@ -40,19 +43,24 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final ThreadLocal<Integer> manualCommitTrackingCnt = ThreadLocal.withInitial(() -> 0);
private final KafkaConsumer<String, String> consumer;
private Histogram e2eMsgProcLatencyHistogram;
public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace,
boolean asyncMsgCommit,
int msgPoolIntervalInMs,
boolean autoCommitEnabled,
int maxMsgCntPerCommit,
KafkaConsumer<String, String> consumer) {
KafkaConsumer<String, String> consumer,
EndToEndStartingTimeSource e2eStartingTimeSrc,
KafkaAdapterMetrics kafkaAdapterMetrics) {
super(kafkaSpace);
this.msgPoolIntervalInMs = msgPoolIntervalInMs;
this.asyncMsgCommit = asyncMsgCommit;
this.autoCommitEnabled = autoCommitEnabled;
this.maxMsgCntPerCommit = maxMsgCntPerCommit;
this.consumer = consumer;
this.e2eStartingTimeSrc = e2eStartingTimeSrc;
this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram();
}
public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); }
@ -116,10 +124,11 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
if (record != null) {
if (logger.isDebugEnabled()) {
logger.debug(
"Receiving message is successful: [{}] - offset({}), cycle ({})",
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({})",
printRecvedMsg(record),
record.offset(),
cycle);
cycle,
System.currentTimeMillis() - record.timestamp());
}
if (!autoCommitEnabled) {
@ -127,7 +136,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
if (bCommitMsg) {
if (!asyncMsgCommit) {
consumer.commitSync();
updateE2ELatencyMetric(record);
if (logger.isDebugEnabled()) {
logger.debug(
"Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})",
@ -144,6 +153,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
"Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})",
cycle,
maxMsgCntPerCommit);
updateE2ELatencyMetric(record);
} else {
logger.debug(
"Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})",
@ -157,15 +167,30 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
}
resetManualCommitTrackingCnt();
} else {
} else {
updateE2ELatencyMetric(record);
incManualCommitTrackingCnt();
}
}
updateE2ELatencyMetric(record);
}
}
}
}
private void updateE2ELatencyMetric(ConsumerRecord<String, String> record) {
long startTimeStamp = 0L;
switch (e2eStartingTimeSrc) {
case MESSAGE_PUBLISH_TIME:
startTimeStamp = record.timestamp();
break;
}
if (startTimeStamp != 0L) {
long e2eMsgLatency = System.currentTimeMillis() - startTimeStamp;
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
}
@Override
public void close() {
try {

View File

@ -0,0 +1,23 @@
package io.nosqlbench.adapter.kafka.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public enum EndToEndStartingTimeSource {
NONE, // no end-to-end latency calculation
MESSAGE_PUBLISH_TIME, // use message publish timestamp
}

View File

@ -15,8 +15,10 @@
*/
package io.nosqlbench.adapter.kafka.util;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.kafka.dispensers.KafkaBaseOpDispenser;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
@ -31,8 +33,23 @@ public class KafkaAdapterMetrics implements NBNamedElement {
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
// - message out of sequence error counter
private Counter msgErrOutOfSeqCounter;
// - message loss counter
private Counter msgErrLossCounter;
// - message duplicate error counter
private Counter msgErrDuplicateCounter;
public KafkaAdapterMetrics(String defaultMetricsPrefix) {
public Histogram getE2eMsgProcLatencyHistogram() {
return e2eMsgProcLatencyHistogram;
}
// end-to-end latency
private Histogram e2eMsgProcLatencyHistogram;
private KafkaBaseOpDispenser kafkaBaseOpDispenser;
public KafkaAdapterMetrics(KafkaBaseOpDispenser kafkaBaseOpDispenser, String defaultMetricsPrefix) {
this.kafkaBaseOpDispenser = kafkaBaseOpDispenser;
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
@ -60,6 +77,27 @@ public class KafkaAdapterMetrics implements NBNamedElement {
this,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
// End-to-end metrics
// Latency
this.e2eMsgProcLatencyHistogram =
ActivityMetrics.histogram(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "e2e_msg_latency",
ActivityMetrics.DEFAULT_HDRDIGITS);
// Error metrics
this.msgErrOutOfSeqCounter =
ActivityMetrics.counter(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_oos");
this.msgErrLossCounter =
ActivityMetrics.counter(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_loss");
this.msgErrDuplicateCounter =
ActivityMetrics.counter(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_dup");
}
public Timer getBindTimer() { return bindTimer; }

View File

@ -41,7 +41,8 @@ public class KafkaAdapterUtil {
// Valid document level parameters for JMS NB yaml file
public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming
ASYNC_API("async_api");
ASYNC_API("async_api"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source");
public final String label;
DOC_LEVEL_PARAMS(String label) {

View File

@ -3,7 +3,8 @@ params:
# Whether to commit message asynchronously
# - default: true
# - only relevant for manual commit
async_api: "true"
# async_api: "true"
e2e_starting_time_source: "message_publish_time"
blocks:
msg-consume-block:

View File

@ -1,7 +1,7 @@
package io.nosqlbench.adapter.pulsar.util;
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -37,7 +37,7 @@
</description>
<properties>
<s4j.version>3.2.0</s4j.version>
<s4j.version>3.2.1</s4j.version>
</properties>
<dependencies>

View File

@ -300,7 +300,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.407</version>
<version>1.12.415</version>
</dependency>
<dependency>

View File

@ -17,27 +17,17 @@
package io.nosqlbench.nb5.proof;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.engine.api.scenarios.NBCLIScenarioParser;
import io.nosqlbench.engine.api.scenarios.WorkloadDesc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.utility.DockerImageName;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class WorkloadContainerVerifications {
private enum Driver {
@ -57,26 +47,35 @@ public class WorkloadContainerVerifications {
}
}
public static Logger logger = LogManager.getLogger(WorkloadContainerVerifications.class);
private final String java = Optional.ofNullable(System.getenv(
"JAVA_HOME")).map(v -> v + "/bin/java").orElse("java");
private final static String JARNAME = "../nb5/target/nb5.jar";
private final static String BASIC_CHECK_IDENTIFIER = "basic_check";
private static final CassandraContainer cass = (CassandraContainer) new CassandraContainer(DockerImageName.parse("cassandra:latest"))
.withExposedPorts(9042).withAccessToHost(true);
private static Map<Driver, List<WorkloadDesc>> basicWorkloadsMapPerDriver = null;
private static Map<Driver, List<String>> basicWorkloadsMapPerDriver = null;
@BeforeAll
public static void listWorkloads() {
//List the tests we would like to run
ProcessInvoker invoker = new ProcessInvoker();
//STEP1: Copy the example workload to the local dir
ProcessResult listResult = invoker.run("list-workloads", 30,
"java", "-jar", JARNAME, "--list-workloads", "--include=examples"
);
assertThat(listResult.exception).isNull();
String listOut = String.join("\n", listResult.getStdoutData());
List<WorkloadDesc> workloads = List.of();
// Define the regular expression pattern
String regex = "/(.+?/)+.+?\\.yaml";
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(listOut);
ArrayList<String> matchedPaths = new ArrayList<>();
while (matcher.find()) {
matchedPaths.add(matcher.group());
}
basicWorkloadsMapPerDriver = new HashMap<>();
try {
workloads = NBCLIScenarioParser.getWorkloadsWithScenarioScripts(true, "examples");
} catch (Exception e) {
throw new RuntimeException("Error while getting workloads:" + e.getMessage(), e);
}
for (Driver driver : Driver.values()) {
basicWorkloadsMapPerDriver.put(driver, getBasicCheckWorkloadsForDriver(workloads, BASIC_CHECK_IDENTIFIER, driver.getName()));
}
getBasicCheckWorkloadsForEachDriver(matchedPaths, BASIC_CHECK_IDENTIFIER);
}
@ -90,11 +89,10 @@ public class WorkloadContainerVerifications {
else if (basicWorkloadsMapPerDriver.get(Driver.CQL).size() == 0)
return;
for(WorkloadDesc workloadDesc : basicWorkloadsMapPerDriver.get(Driver.CQL))
for(String workloadPath : basicWorkloadsMapPerDriver.get(Driver.CQL))
{
String path = workloadDesc.getYamlPath();
int lastSlashIndex = path.lastIndexOf('/');
String shortName = path.substring(lastSlashIndex + 1);
int lastSlashIndex = workloadPath.lastIndexOf('/');
String shortName = workloadPath.substring(lastSlashIndex + 1);
if(shortName.equals("cql-iot-dse.yaml"))
continue;
//STEP0:Start the test container and expose the 9042 port on the local host.
@ -111,7 +109,7 @@ public class WorkloadContainerVerifications {
String hostIP = cass.getHost();
//STEP1: Run the example cassandra workload using the schema tag to create the Cass Baselines keyspace
//STEP1: Run the example cassandra workload using the schema tag to create the Cass Baselines keyspace
String[] args = new String[]{
"java", "-jar", JARNAME, shortName, BASIC_CHECK_IDENTIFIER, "host="+ hostIP, "localdc="+ datacenter, "port="+ mappedPort9042.toString(), "table=keyvalue", "keyspace=baselines"
};
@ -172,40 +170,62 @@ public class WorkloadContainerVerifications {
// System.out.println(e.getMessage());
// fail();
// } finally {
cass.stop();
cass.stop();
// }
}
}
@AfterEach
public void cleanup(){
System.out.println("setup");
}
/*
This method filters the input list of workloads to output the subset of workloads
that include a specific scenario (input) and run the specified driver
that include a specific scenario (input) and maps all workloads with that scenario to
a key which is their common driver
*/
public static List<WorkloadDesc> getBasicCheckWorkloadsForDriver(List<WorkloadDesc> workloads ,String scenarioFilter, String driver) {
String substring = "driver=" + driver;
ArrayList<WorkloadDesc> workloadsForDriver = new ArrayList<>();
for (WorkloadDesc workload : workloads) {
if(workload.getScenarioNames().contains(scenarioFilter)) {
try {
Path yamlPath = Path.of(workload.getYamlPath());
List<String> lines = Files.readAllLines(yamlPath);
for (String line : lines) {
if (line.contains(substring)) {
workloadsForDriver.add(workload);
private static void getBasicCheckWorkloadsForEachDriver(List<String> workloadPaths ,String scenarioFilter) {
for (String workloadPath : workloadPaths) {
try {
int lastSlashIndex = workloadPath.lastIndexOf('/');
String shortName = workloadPath.substring(lastSlashIndex + 1);
String[] args = new String[]{
"java", "-jar", JARNAME, shortName, scenarioFilter, "--show-script"
};
ProcessInvoker invoker = new ProcessInvoker();
ProcessResult runShowScriptResult = invoker.run("run-show-script", 10, args);
assertThat(runShowScriptResult.exception).isNull();
String listOut = String.join("\n", runShowScriptResult.getStdoutData());
Pattern pattern = Pattern.compile("'driver':\\s*'(.+?)'");
// Use the Matcher class to find the substring in the output script that defines the driver
Matcher matcher = pattern.matcher(listOut);
if (matcher.find()) {
String scenarioDriverValue = matcher.group(1);
for (Driver driverType : Driver.values())
{
if(driverType.getName().equals(scenarioDriverValue))
{
if(basicWorkloadsMapPerDriver.containsKey(driverType))
{
List<String> currentList = basicWorkloadsMapPerDriver.get(driverType);
// Modify the list by adding new strings to it
currentList.add(workloadPath);
// Put the updated list back into the HashMap using the same key
basicWorkloadsMapPerDriver.put(driverType, currentList);
}
else
{
List<String> pathList = new ArrayList<>();
pathList.add(workloadPath);
basicWorkloadsMapPerDriver.put(driverType, pathList);
}
break;
}
}
} catch (Exception e) {
logger.error("Error reading file " + workload.getYamlPath() + ": " + e.getMessage());
break;
}
} catch (Exception e) {
logger.error("Error reading file " + workloadPath + ": " + e.getMessage());
break;
}
}
return workloadsForDriver;
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.library.basics.shared.conversions.from_string;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.util.UUID;
import java.util.function.Function;
/**
* Convert the incoming String value to the equivalent UUID with {@link UUID#fromString(String)}
*/
@Categories(Category.datetime)
@ThreadSafeMapper
public class ToUUID implements Function<String, UUID> {
@Override
public UUID apply(String s) {
return UUID.fromString(s);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.library.basics.shared.conversions.from_string;
import org.junit.jupiter.api.Test;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
public class ToUUIDTest {
@Test
public void testStringToUUID() {
assertThat(new ToUUID().apply("ad3b5598-8620-41f7-9631-3ae47c7aa465"))
.isEqualTo(UUID.fromString("ad3b5598-8620-41f7-9631-3ae47c7aa465"));
}
}