Finalized CQL workload specific Test Container checks and added the infrastructure for other drivers

This commit is contained in:
MikeYaacoubStax
2023-02-10 16:13:05 -05:00
parent 67ba6e270d
commit f2dd1b6cf4
9 changed files with 76 additions and 291 deletions

View File

@@ -2,10 +2,12 @@ description: An IOT workload with more optimal settings for DSE
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:truncate threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:truncate threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
@@ -45,7 +47,10 @@ blocks:
'compaction_window_unit': 'MINUTES',
'split_during_flush': true
};
truncate:
params:
prepared: false
ops:
truncate-table: |
truncate table <<keyspace:baselines>>.<<table:iot>>;
rampup:

View File

@@ -6,6 +6,7 @@ description: |
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:truncate threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
astra:
@@ -14,6 +15,7 @@ scenarios:
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
@@ -53,6 +55,10 @@ blocks:
'compaction_window_size': <<expiry_minutes:60>>,
'compaction_window_unit': 'MINUTES'
};
truncate:
params:
prepared: false
ops:
truncate-table: |
truncate table <<keyspace:baselines>>.<<table:iot>>;
schema-astra:

View File

@@ -1,77 +0,0 @@
---
title: CQL Key-Value
weight: 1
---
## Description
The CQL Key-Value workload demonstrates the simplest possible schema with payload data. This is useful for measuring
system capacity most directly in terms of raw operations. As a reference point, provides some insight around types of
workloads that are constrained around messaging, threading, and tasking, rather than bulk throughput.
During preload, all keys are set with a value. During the main phase of the workload, random keys from the known
population are replaced with new values which never repeat. During the main phase, random partitions are selected for
upsert, with row values never repeating.
## Operations
### insert (rampup, main)
insert into baselines.keyvalue (key, value) values (?,?);
### read (main)
select * from baselines.keyvalue where key=?key;
## Data Set
### baselines.keyvalue insert (rampup)
- key - text, number as string, selected sequentially up to keycount
- value - text, number as string, selected sequentially up to valuecount
### baselines.keyvalue insert (main)
- key - text, number as string, selected uniformly within keycount
- value - text, number as string, selected uniformly within valuecount
### baselines.keyvalue read (main)
- key - text, number as string, selected uniformly within keycount
## Workload Parameters
This workload has no adjustable parameters when used in the baseline tests.
When used for additional testing, the following parameters should be supported:
- keycount - the number of unique keys
- valuecount - the number of unique values
## Key Performance Metrics
Client side metrics are a more accurate measure of the system behavior from a user's perspective. For microbench and
baseline tests, these are the only required metrics. When gathering metrics from multiple server nodes, they should be
kept in aggregate form, for min, max, and average for each time interval in monitoring. For example, the avg p99 latency
for reads should be kept, as well as the min p99 latency for reads. If possible metrics, should be kept in plot form,
with discrete histogram values per interval.
### Client-Side
- read ops/s
- write ops/s
- read latency histograms
- write latency histograms
- exception counts
### Server-Side
- pending compactions
- bytes compacted
- active data on disk
- total data on disk
# Notes on Interpretation
Once the average ratio of overwrites starts to balance with the rate of compaction, a steady state should be achieved.
At this point, pending compactions and bytes compacted should be mostly flat over time.

View File

@@ -1,74 +0,0 @@
min_version: "5.17.1"
description: A workload with only text keys and text values
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
astra:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
bindings:
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String
seq_value: Hash(); Mod(<<valuecount:1000000000>>); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
blocks:
schema:
params:
prepared: false
ops:
create-table: |
create table if not exists <<keyspace:baselines>>.<<table:keyvalue>> (
key text,
value text,
PRIMARY KEY (key)
);
schema-astra:
params:
prepared: false
ops:
create-table: |
create table if not exists <<keyspace:baselines>>.<<table:keyvalue>> (
key text,
value text,
PRIMARY KEY (key)
);
rampup:
params:
cl: <<write_cl:LOCAL_QUORUM>>
ops:
rampup-insert: |
insert into <<keyspace:baselines>>.<<table:keyvalue>>
(key, value)
values ({seq_key},{seq_value});
verify:
params:
cl: <<read_cl:LOCAL_QUORUM>>
verify-fields: key->seq_key, value->seq_value
ops:
verify-select: |
select * from <<keyspace:baselines>>.<<table:keyvalue>> where key={seq_key};
main-read:
params:
ratio: 5
cl: <<read_cl:LOCAL_QUORUM>>
ops:
main-select: |
select * from <<keyspace:baselines>>.<<table:keyvalue>> where key={rw_key};
main-write:
params:
ratio: 5
cl: <<write_cl:LOCAL_QUORUM>>
ops:
main-insert: |
insert into <<keyspace:baselines>>.<<table:keyvalue>> (key, value) values ({rw_key}, {rw_value});

View File

@@ -36,8 +36,8 @@ scenarios:
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
basic_check:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
params:
instrument: true

View File

@@ -6,6 +6,7 @@ description: |
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:truncate threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
astra:
@@ -13,7 +14,8 @@ scenarios:
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
basic_check:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:truncate threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10) threads=auto
@@ -54,6 +56,11 @@ blocks:
'compaction_window_size': TEMPLATE(expiry_minutes,60),
'compaction_window_unit': 'MINUTES'
};
truncate:
params:
prepared: false
ops:
truncate-table: |
truncate table TEMPLATE(keyspace,baselines).TEMPLATE(table,iot);
schema-astra:

View File

@@ -19,6 +19,43 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<profiles>
<profile>
<id>enable-container-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>run-container-tests</id>
<phase>integration-test</phase>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<includes>
<include>**/*Container*Verification*.java</include>
<include>**/*ContainerVerifications.java</include>
</includes>
<properties>
</properties>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>

View File

@@ -1,102 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb5.proof;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public class DSEContainerIntegrationTest {
public static Logger logger = LogManager.getLogger(DSEContainerIntegrationTest.class);
private final String java = Optional.ofNullable(System.getenv(
"JAVA_HOME")).map(v -> v + "/bin/java").orElse("java");
private final static String JARNAME = "../nb5/target/nb5.jar";
// private static GenericContainer cass= new CassandraContainer("cassandra").withExposedPorts(9042);
private static String hostIP = "127.0.0.1";
private static String datacenter = "datacenter1";
private static Integer mappedPort9042 = 9042;
private static ProcessInvoker invoker = new ProcessInvoker();
private static CassandraContainer dse;
static {
dse = (CassandraContainer) new CassandraContainer(DockerImageName.parse("datastax/dse-server:6.8.17-ubi7").asCompatibleSubstituteFor("cassandra")).withEnv("DS_LICENSE", "accept").withEnv("CASSANDRA_DC", datacenter).withExposedPorts(9042);
}
@BeforeAll
public static void initContainer() {
//STEP0:Start the test container and expose the 9042 port on the local host.
//So that the docker bridge controller exposes the port to our process invoker that will run nb5
//and target cassandra on that docker container
dse.start();
datacenter = dse.getLocalDatacenter();
//When running with a local Docker daemon, exposed ports will usually be reachable on localhost.
// However, in some CI environments they may instead be reachable on a different host.
mappedPort9042 = dse.getMappedPort(9042);
hostIP = dse.getHost();
}
@BeforeEach
public void setUp() {
System.out.println("setup");
}
@Test
public void testSimplePutAndGet() {
invoker.setLogDir("logs/test");
// //STEP1: Copy the example workload to the local dir
ProcessResult copyResult = invoker.run("copy-workload", 30,
"java", "-jar", JARNAME, "--copy=activities/baselines/cql-iot-dse.yaml"
);
assertThat(copyResult.exception).isNull();
String copyOut = String.join("\n", copyResult.getStdoutData());
//STEP2: Run the example cassandra workload using the schema tag to create the Cass Baselines keyspace
String[] args = new String[]{
"java", "-jar", JARNAME, "cql-iot-dse.yaml", "default", "host="+hostIP, "localdc="+datacenter, "port="+ mappedPort9042.toString()
};
ProcessResult runSchemaResult = invoker.run("run-workload", 30, args);
logger.info("The final command line: " + String.join(" ", args));
assertThat(runSchemaResult.exception).isNull();
String runSchemaOut = String.join("\n", runSchemaResult.getStdoutData());
System.out.println(runSchemaOut);
//STEP3: Run the example cassandra workload using the rampup phase to create the data in a specific number of cycles
// ProcessResult runRampUpResult = invoker.run("run-workload", 30, "java", "-jar", JARNAME, "run",
// "driver=cql", "workload=cql-keyvalue", "host="+hostIP, "localdc="+datacenter, "port="+ mappedPort9042.toString(),
// "tags=blocks:rampup", "cycles=100k"
// );
// assertThat(runRampUpResult.exception).isNull();
// String runRampUpOut = String.join("\n", runRampUpResult.getStdoutData());
// System.out.println(runRampUpOut);
System.out.println("end");
}
}

View File

@@ -37,8 +37,9 @@ import java.util.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class CassandraContainersIntegrationTest {
public class WorkloadContainerVerifications {
private enum Driver {
CQL("cql"),
HTTP("http"),
@@ -55,16 +56,14 @@ public class CassandraContainersIntegrationTest {
return name;
}
}
public static Logger logger = LogManager.getLogger(CassandraContainersIntegrationTest.class);
public static Logger logger = LogManager.getLogger(WorkloadContainerVerifications.class);
private final String java = Optional.ofNullable(System.getenv(
"JAVA_HOME")).map(v -> v + "/bin/java").orElse("java");
private final static String JARNAME = "../nb5/target/nb5.jar";
private final static String BASIC_CHECK_IDENTIFIER = "basic_check";
private static final CassandraContainer cass = (CassandraContainer) new CassandraContainer(DockerImageName.parse("cassandra:latest"))
.withExposedPorts(9042).withAccessToHost(true);
private static Map<Driver, List<WorkloadDesc>> basicWorkloadsMapPerDriver = null;
//.waitingFor(new CassandraWaitStrategy());
@BeforeAll
public static void listWorkloads() {
@@ -82,7 +81,6 @@ public class CassandraContainersIntegrationTest {
@BeforeEach
public void setUp() {
System.out.println("setup");
}
@@ -101,13 +99,12 @@ public class CassandraContainersIntegrationTest {
String path = workloadDesc.getYamlPath();
int lastSlashIndex = path.lastIndexOf('/');
String shortName = path.substring(lastSlashIndex + 1);
if(!shortName.equals("cql-keyvalue2.yaml"))
continue;;
if(shortName.equals("cql-iot-dse.yaml"))
continue;
//STEP0:Start the test container and expose the 9042 port on the local host.
//So that the docker bridge controller exposes the port to our process invoker that will run nb5
//and target cassandra on that docker container
cass.start();
//the default datacenter name
String datacenter = cass.getLocalDatacenter();
//When running with a local Docker daemon, exposed ports will usually be reachable on localhost.
@@ -118,10 +115,9 @@ public class CassandraContainersIntegrationTest {
String hostIP = cass.getHost();
//STEP1: Run the example cassandra workload using the schema tag to create the Cass Baselines keyspace
String[] args = new String[]{
"java", "-jar", JARNAME, shortName, BASIC_CHECK_IDENTIFIER, "host="+ hostIP, "localdc="+ datacenter, "port="+ mappedPort9042.toString()
"java", "-jar", JARNAME, shortName, BASIC_CHECK_IDENTIFIER, "host="+ hostIP, "localdc="+ datacenter, "port="+ mappedPort9042.toString(), "table=keyvalue", "keyspace=baselines"
};
logger.info("The final command line: " + String.join(" ", args));
ProcessResult runSchemaResult = invoker.run("run-workload", 30, args);
@@ -166,43 +162,30 @@ public class CassandraContainersIntegrationTest {
logger.info("Table \"baselines.keyvalue\" was found, nb5 command had created it successfully");
//->Check for the creation of the baselines keyvalue table
logger.info("Table \"baselines.keyvalue\" has at least 5 rows of key-value pairs, nb5 command had created them successfully");
result = session.execute("SELECT count(*) FROM baselines.keyvalue");
int rowCount = result.one().getInt(0);
ResultSet resultSet = session.execute("SELECT count(*) FROM baselines.keyvalue");
Row row = resultSet.one();
long rowCount = row.getLong(0);
logger.info("Number of rows in baselines.keyvalue: " + rowCount);
assertTrue(rowCount >= 5);
logger.info("Table \"baselines.keyvalue\" has at least 5 rows of key-value pairs, nb5 command had created them successfully");
} catch (Exception e)
{
System.out.println(e.getMessage());
fail();
} finally {
cass.stop();
}
cass.stop();
}
//STEP5 Create a failing test to make sure that the workload won't work, here we use a random wrong IP
// String[] args2 = new String[]{
// "java", "-jar", JARNAME, "cql-keyvalue2.yaml", "default", "host=0.1.0.1", "localdc="+datacenter, "port="+ mappedPort9042.toString(), "rampup-cycles=10", "main-cycles=10"
// };
// logger.info("The final command line: " + String.join(" ", args2));
// ProcessResult runFailingSchemaResult = invoker.run("run-workload", 30, args2);
// assertThat(runFailingSchemaResult.exception).isNull();
// String runFailingSchemaOut = String.join("\n", runFailingSchemaResult.getStdoutData());
// assertThat(runFailingSchemaOut.toLowerCase()).contains("error");
// System.out.println("end");
}
@AfterEach
public void stopContainers(){
public void cleanup(){
System.out.println("setup");
}
/*
This method filters the input list of workloads to output the subset of workloads that include a specific scenario (input)
and run the specified driver
This method filters the input list of workloads to output the subset of workloads
that include a specific scenario (input) and run the specified driver
*/
public static List<WorkloadDesc> getBasicCheckWorkloadsForDriver(List<WorkloadDesc> workloads ,String scenarioFilter, String driver) {
String substring = "driver=" + driver;