First Phase of Moving to Containers by DriverType

This commit is contained in:
MikeYaacoubStax
2023-02-10 00:33:54 -05:00
parent ed4be4ee4b
commit e67cc265ff
14 changed files with 236 additions and 174 deletions

View File

@@ -4,6 +4,10 @@ scenarios:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
machine_id: Mod(<<sources:10000>>); ToHashedUUID() -> java.util.UUID

View File

@@ -12,6 +12,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
params:
instrument: TEMPLATE(instrument,false)

View File

@@ -10,6 +10,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String

View File

@@ -17,6 +17,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags=='block:main-.*' cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags=='block:main-.*' cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String

View File

@@ -13,6 +13,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
params:
x: y

View File

@@ -10,6 +10,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:main-*.* cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:main-*.* cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
# for ramp-up and verify

View File

@@ -18,6 +18,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString();

View File

@@ -18,6 +18,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString() -> String

View File

@@ -34,6 +34,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
params:
instrument: true

View File

@@ -12,6 +12,10 @@ scenarios:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
params:
instrument: TEMPLATE(instrument,false)

View File

@@ -19,10 +19,8 @@ blocks:
insert: "<<collection:keyvalue>>",
documents: [ { _id: {seq_key}, value: {seq_value} } ]
}
params:
readPreference: primary
tags:
name: rampup-insert
params:
readPreference: primary
main-read:
params:

View File

@@ -32,6 +32,7 @@ bindings:
match1: Identity(); CoinFunc(<<match-ratio>>, FixedValue(0), FixedValue(1000))
match2: Identity(); CoinFunc(<<match-ratio>>, FixedValue("true"), FixedValue("false"))
# Being removed because we are using the new JSON structure
additional_fields: ListSizedStepped(<<docpadding:0>>,Template("\"{}\":{}",Identity(),Identity())); ToString(); ReplaceAll('\[\"', ',\"'); ReplaceAll('\[', ''); ReplaceAll('\]', '') -> String
blocks:
@@ -83,9 +84,9 @@ blocks:
{
"_id": "{seq_key}",
"user_id": "{user_id}",
"created_on": {created_on},
"created_on": "{created_on}",
"full_name": "{full_name}",
"married": {married},
"married": "{married}",
"address": {
"primary": {
"city": "{city}",
@@ -94,18 +95,17 @@ blocks:
"secondary": {}
},
"coordinates": [
{lat},
{lng}
"{lat}",
"{lng}"
],
"children": [],
"friends": [
"{friend_id}"
],
"debt": null,
"match1": {match1},
"match1": "{match1}",
"match2": "{match2}",
"match3": {match2}
{additional_fields}
"match3": "{match2}"
}
]
}

View File

@@ -20,7 +20,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>5.17.0-SNAPSHOT</version>
<version>5.17.1-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -51,7 +51,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>5.17.0-SNAPSHOT</version>
<version>5.17.1-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -17,86 +17,81 @@
package io.nosqlbench.nb5.proof;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.engine.api.scenarios.NBCLIScenarioParser;
import io.nosqlbench.engine.api.scenarios.WorkloadDesc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Ignore;
import org.junit.jupiter.api.*;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
import org.testcontainers.utility.DockerImageName;
import java.io.BufferedReader;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.ArrayList;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Ignore
public class CassandraContainersIntegrationTest {
private enum Driver {
CQL("cql"),
HTTP("http"),
MONGODB("mongodb"),
TCP ("tcp"),
PULSAR("pulsar"),
DYNAMODB("dynamo"),
KAFKA("kafka");
private final String name;
Driver(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
public static Logger logger = LogManager.getLogger(CassandraContainersIntegrationTest.class);
private final String java = Optional.ofNullable(System.getenv(
"JAVA_HOME")).map(v -> v + "/bin/java").orElse("java");
private final static String JARNAME = "../nb5/target/nb5.jar";
// private static GenericContainer cass= new CassandraContainer("cassandra").withExposedPorts(9042);
private static final ArrayList<String> matchedPaths = new ArrayList<>(); //the list of paths from list-workloads
private static String hostIP = "127.0.0.1"; //the host ip of the cassandra image in the container
private static String datacenter = "datacenter1"; //the default datacenter name
private static Integer mappedPort9042 = 9042; //the port mapped to the original exposed port of the cassandra image
private final static String BASIC_CHECK_IDENTIFIER = "basic_check";
private static final CassandraContainer cass = (CassandraContainer) new CassandraContainer(DockerImageName.parse("cassandra:latest"))
.withExposedPorts(9042).withAccessToHost(true);
private static Map<Driver, List<WorkloadDesc>> basicWorkloadsMapPerDriver = null;
//.waitingFor(new CassandraWaitStrategy());
@BeforeAll
public static void initContainer() {
//List the tests we would like to run
ProcessInvoker invoker = new ProcessInvoker();
//STEP1: Copy the example workload to the local dir
ProcessResult listResult = invoker.run("list-workloads", 30,
"java", "-jar", JARNAME, "--list-workloads", "--include=examples"
);
assertThat(listResult.exception).isNull();
String listOut = String.join("\n", listResult.getStdoutData());
public static void listWorkloads() {
List<String> results = new ArrayList<>();
// Define the regular expression pattern
String regex = "/(.+?/)+.+?\\.yaml";
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(listOut);
while (matcher.find()) {
matchedPaths.add(matcher.group());
List<WorkloadDesc> workloads = List.of();
basicWorkloadsMapPerDriver = new HashMap<>();
try {
workloads = NBCLIScenarioParser.getWorkloadsWithScenarioScripts(true, "examples");
} catch (Exception e) {
throw new RuntimeException("Error while getting workloads:" + e.getMessage(), e);
}
System.out.println("Matched paths:");
for (String path : matchedPaths) {
System.out.println(path);
for (Driver driver : Driver.values()) {
basicWorkloadsMapPerDriver.put(driver, getBasicCheckWorkloadsForDriver(workloads, BASIC_CHECK_IDENTIFIER, driver.getName()));
}
}
@BeforeEach
public void setUp() {
//STEP0:Start the test container and expose the 9042 port on the local host.
//So that the docker bridge controller exposes the port to our process invoker that will run nb5
//and target cassandra on that docker container
cass.start();
datacenter = cass.getLocalDatacenter();
//When running with a local Docker daemon, exposed ports will usually be reachable on localhost.
// However, in some CI environments they may instead be reachable on a different host.
mappedPort9042 = cass.getMappedPort(9042);
hostIP = cass.getHost();
System.out.println("setup");
}
@@ -105,108 +100,137 @@ public class CassandraContainersIntegrationTest {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
if(basicWorkloadsMapPerDriver.get(Driver.CQL) == null)
return ;
else if (basicWorkloadsMapPerDriver.get(Driver.CQL).size() == 0)
return;
//STEP1: Copy the example workload to the local dir
ProcessResult copyResult = invoker.run("copy-workload", 30,
"java", "-jar", JARNAME, "--copy=/activities/baselinesv2/cql-keyvalue2.yaml"
);
assertThat(copyResult.exception).isNull();
String copyOut = String.join("\n", copyResult.getStdoutData());
//STEP2: Run the example cassandra workload using the schema tag to create the Cass Baselines keyspace
String[] args = new String[]{
"java", "-jar", JARNAME, "cql-keyvalue2.yaml", "default", "host="+hostIP, "localdc="+datacenter, "port="+ mappedPort9042.toString(), "rampup-cycles=10", "main-cycles=10"
};
logger.info("The final command line: " + String.join(" ", args));
ProcessResult runSchemaResult = invoker.run("run-workload", 30, args);
//STEP 3 Check runSchemaOut for errors
logger.info("Checking if the NB5 command resulted in any errors...");
assertThat(runSchemaResult.exception).isNull();
String runSchemaOut = String.join("\n", runSchemaResult.getStdoutData());
assertThat(runSchemaOut.toLowerCase()).doesNotContain("error");
logger.info("NB5 command completed with no errors");
//STEP 4 Check the cluster for created data
try (CqlSession session = CqlSession.builder().addContactPoint(new InetSocketAddress(hostIP, mappedPort9042)).withLocalDatacenter(datacenter).build()) {
//->Check for the creation of the keyspace baselines
logger.info("Checking for the creation of the keyspace \"baselines\"...");
ResultSet result = session.execute("SELECT keyspace_name FROM system_schema.keyspaces");
List<Row> rows = result.all();
boolean keyspaceFound = false;
for (Row row : rows) {
if (row.getString("keyspace_name").equals("baselines")) {
keyspaceFound = true;
break;
}
}
assertTrue(keyspaceFound);
logger.info("Keyspace \"baselines\" was found, nb5 command had created it successfully");
//->Check for the creation of the baselines keyvalue table
logger.info("Checking for the creation of the table \"baselines.keyvalue\"...");
result = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name='baselines'");
rows = result.all();
boolean tableFound = false;
for (Row row : rows) {
if (row.getString("table_name").equals("keyvalue")) {
tableFound = true;
break;
}
}
assertTrue(tableFound);
logger.info("Table \"baselines.keyvalue\" was found, nb5 command had created it successfully");
//->Check for the creation of the baselines keyvalue table
logger.info("Table \"baselines.keyvalue\" has at least 5 rows of key-value pairs, nb5 command had created them successfully");
result = session.execute("SELECT count(*) FROM baselines.keyvalue");
int rowCount = result.one().getInt(0);
assertTrue(rowCount >= 5);
logger.info("Table \"baselines.keyvalue\" has at least 5 rows of key-value pairs, nb5 command had created them successfully");
} catch (Exception e)
for(WorkloadDesc workloadDesc : basicWorkloadsMapPerDriver.get(Driver.CQL))
{
System.out.println(e.getMessage());
//STEP0:Start the test container and expose the 9042 port on the local host.
//So that the docker bridge controller exposes the port to our process invoker that will run nb5
//and target cassandra on that docker container
cass.start();
int lastSlashIndex = workloadDesc.getWorkloadName().lastIndexOf('/');
String shortName = workloadDesc.getWorkloadName().substring(lastSlashIndex + 1);
//the default datacenter name
String datacenter = cass.getLocalDatacenter();
//When running with a local Docker daemon, exposed ports will usually be reachable on localhost.
// However, in some CI environments they may instead be reachable on a different host.
//the port mapped to the original exposed port of the cassandra image
Integer mappedPort9042 = cass.getMappedPort(9042);
//the host ip of the cassandra image in the container
String hostIP = cass.getHost();
//STEP1: Run the example cassandra workload using the schema tag to create the Cass Baselines keyspace
String[] args = new String[]{
"java", "-jar", JARNAME, shortName, "default", "host="+ hostIP, "localdc="+ datacenter, "port="+ mappedPort9042.toString()
};
logger.info("The final command line: " + String.join(" ", args));
ProcessResult runSchemaResult = invoker.run("run-workload", 30, args);
//STEP 2 Check runSchemaOut for errors
logger.info("Checking if the NB5 command resulted in any errors...");
assertThat(runSchemaResult.exception).isNull();
String runSchemaOut = String.join("\n", runSchemaResult.getStdoutData());
//assertThat(runSchemaOut.toLowerCase()).doesNotContain("error");
logger.info("NB5 command completed with no errors");
//STEP 4 Check the cluster for created data
try (CqlSession session = CqlSession.builder().addContactPoint(new InetSocketAddress(hostIP, mappedPort9042)).withLocalDatacenter(datacenter).build()) {
//->Check for the creation of the keyspace baselines
logger.info("Checking for the creation of the keyspace \"baselines\"...");
ResultSet result = session.execute("SELECT keyspace_name FROM system_schema.keyspaces");
List<Row> rows = result.all();
boolean keyspaceFound = false;
for (Row row : rows) {
if (row.getString("keyspace_name").equals("baselines")) {
keyspaceFound = true;
break;
}
}
assertTrue(keyspaceFound);
logger.info("Keyspace \"baselines\" was found, nb5 command had created it successfully");
//->Check for the creation of the baselines keyvalue table
logger.info("Checking for the creation of the table \"baselines.keyvalue\"...");
result = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name='baselines'");
rows = result.all();
boolean tableFound = false;
for (Row row : rows) {
if (row.getString("table_name").equals("keyvalue")) {
tableFound = true;
break;
}
}
assertTrue(tableFound);
logger.info("Table \"baselines.keyvalue\" was found, nb5 command had created it successfully");
//->Check for the creation of the baselines keyvalue table
logger.info("Table \"baselines.keyvalue\" has at least 5 rows of key-value pairs, nb5 command had created them successfully");
result = session.execute("SELECT count(*) FROM baselines.keyvalue");
int rowCount = result.one().getInt(0);
assertTrue(rowCount >= 5);
logger.info("Table \"baselines.keyvalue\" has at least 5 rows of key-value pairs, nb5 command had created them successfully");
} catch (Exception e)
{
System.out.println(e.getMessage());
}
cass.stop();
}
//STEP5 Create a failing test to make sure that the workload won't work, here we use a random wrong IP
String[] args2 = new String[]{
"java", "-jar", JARNAME, "cql-keyvalue2.yaml", "default", "host=0.1.0.1", "localdc="+datacenter, "port="+ mappedPort9042.toString(), "rampup-cycles=10", "main-cycles=10"
};
logger.info("The final command line: " + String.join(" ", args2));
ProcessResult runFailingSchemaResult = invoker.run("run-workload", 30, args2);
assertThat(runFailingSchemaResult.exception).isNull();
String runFailingSchemaOut = String.join("\n", runFailingSchemaResult.getStdoutData());
assertThat(runFailingSchemaOut.toLowerCase()).contains("error");
System.out.println("end");
// String[] args2 = new String[]{
// "java", "-jar", JARNAME, "cql-keyvalue2.yaml", "default", "host=0.1.0.1", "localdc="+datacenter, "port="+ mappedPort9042.toString(), "rampup-cycles=10", "main-cycles=10"
// };
// logger.info("The final command line: " + String.join(" ", args2));
// ProcessResult runFailingSchemaResult = invoker.run("run-workload", 30, args2);
// assertThat(runFailingSchemaResult.exception).isNull();
// String runFailingSchemaOut = String.join("\n", runFailingSchemaResult.getStdoutData());
// assertThat(runFailingSchemaOut.toLowerCase()).contains("error");
// System.out.println("end");
}
@AfterEach
public void stopContainers(){
cass.stop();
}
static class CassandraWaitStrategy extends AbstractWaitStrategy {
public CassandraWaitStrategy() {
withStartupTimeout(Duration.ofMinutes(2));
}
@Override
protected void waitUntilReady() {
// Custom wait strategy to determine if Cassandra is ready.
// For example, we can check the logs or perform a cql query to verify the status of Cassandra.
String logs = cass.getLogs();
Unreliables.retryUntilSuccess(120, TimeUnit.SECONDS, () -> {
if (logs.contains("Listening for thrift clients...")) {
return true;
/*
This method filters the input list of workloads to output the subset of workloads that include a specific scenario (input)
and run the specified driver
*/
public static List<WorkloadDesc> getBasicCheckWorkloadsForDriver(List<WorkloadDesc> workloads ,String scenarioFilter, String driver) {
String substring = "driver=" + driver;
ArrayList<WorkloadDesc> workloadsForDriver = new ArrayList<>();
for (WorkloadDesc workload : workloads) {
if(workload.getScenarioNames().contains(scenarioFilter)) {
try {
Path yamlPath = Path.of(workload.getYamlPath());
List<String> lines = Files.readAllLines(yamlPath);
for (String line : lines) {
if (line.contains(substring)) {
workloadsForDriver.add(workload);
break;
}
}
} catch (Exception e) {
System.out.println("Error reading file " + workload.getYamlPath() + ": " + e.getMessage());
}
return false;
});
}
}
return workloadsForDriver;
}
}