Merge branch 'main' of github.com:nosqlbench/nosqlbench into adapter-cockroachdb

This commit is contained in:
Madhavan Sridharan
2023-01-10 16:16:11 -05:00
73 changed files with 1022 additions and 1761 deletions

View File

@@ -94,7 +94,7 @@ jobs:
find . -ls
rsync -av --delete -I --exclude '_index.md' drivers/ nosqlbench-build-docs/site/content/docs/drivers
rsync -av --delete -I --exclude '_index.md' bindings/ nosqlbench-build-docs/site/content/docs/bindings
echo "previewdocs.nosqlbench.io" > nosqlbench-build-docs/site/staticCNAME
echo "previewdocs.nosqlbench.io" > nosqlbench-build-docs/site/static/CNAME
cd nosqlbench-build-docs
git add -A
CHANGES=$(git status --porcelain 2>/dev/null| wc -l)

View File

@@ -113,7 +113,7 @@ jobs:
PRERELEASE_BRANCH_PATTERN: "main"
GIT_RELEASE_BOT_NAME: "nb-droid"
GIT_RELEASE_BOT_EMAIL: ${{ secrets.GIT_RELEASE_BOT_EMAIL }}
ACCESS_TOKEN: ${{ secrets.GITHUB_ACCESS_TOKEN }}
ACCESS_TOKEN: ${{ secrets.NBDROID_TOKEN }}
GPG_ENABLED: "true"
GPG_KEY_ID: ${{ secrets.GITHUB_GPG_KEY_ID }}
GPG_KEY: ${{ secrets.GITHUB_GPG_KEY }}
@@ -171,7 +171,7 @@ jobs:
PRERELEASE_BRANCH_PATTERN: "main"
GIT_RELEASE_BOT_NAME: "nb-droid"
GIT_RELEASE_BOT_EMAIL: ${{ secrets.GIT_RELEASE_BOT_EMAIL }}
ACCESS_TOKEN: ${{ secrets.GITHUB_ACCESS_TOKEN }}
ACCESS_TOKEN: ${{ secrets.NBDROID_TOKEN }}
GPG_ENABLED: "true"
GPG_KEY_ID: ${{ secrets.GITHUB_GPG_KEY_ID }}
GPG_KEY: ${{ secrets.GITHUB_GPG_KEY }}

View File

@@ -0,0 +1,185 @@
- 808399ea1 (HEAD -> main, origin/main) github actions release testing
- a93c5acac checkpoint release for nb5
- b8e0bcd26 Merge branch 'main' of github.com:nosqlbench/nosqlbench
- 0eb71686a removed stale modules
- d785fe4bb Merge pull request #893 from nosqlbench/my-nosqlbench-440
- ea93403f4 (origin/my-nosqlbench-440) removed suffix parameter from tcp driver op dispensers (We have string templates to do that now)
- bca21e86b codecov impl
- 11e4c8cc4 Scan fix: cs #17, implicit narrowing
- 6881c0790 Revert "nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d"
- f8eecd541 Scan fix: cs #35, implicit narrowing
- 1b2655c27 nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d
- 376a9b04a combine build phases for main verification
- 4c2a18148 run example tests serially for now
- 4e418c7c7 remove engine-clients module
- 0b7d0aaad remove engine-clients from dependencies
- 1af40544b allow build on push for this branch
- f4aa80f9e remove previous "nb" module, which will be replaced by the nb5 module soon
- ccb2aefec update module versions
- edbf25abe remove nb module profile
- 8323c78e4 update metrics version to drop old slf4j dependency
- 1e7d67697 deactivate rest module with old dependencies
- ac279d364 consolidate error tests
- 0bc4064f3 sync stale versions on inactive modules
- 9f1c20e10 update and fill-in missing slf4j stub layer
- 9c73c4e18 remove previous "nb" module, which will be replaced by the nb5 module soon
- 2d16ff3b5 Revert "NB5 address high priority code scans" (#895)
- ecfe3d633 Narrowing fixes
- f1bb2ed06 use alternate inline binding form for zola
- 77243f325 make anchor links zola compatible
- b334a514f remove duplicitous doc file
- dfe1c0775 Merge pull request #896 from nosqlbench/nosqlbench-886-noslf4j
- 1c86fac3f (origin/nosqlbench-886-noslf4j, nosqlbench-886-noslf4j) combine build phases for main verification
- 688a6f728 run example tests serially for now
- c86029818 remove engine-clients module
- 3e2802a6e remove engine-clients from dependencies
- 50207b38f allow build on push for this branch
- 865c0ac38 remove previous "nb" module, which will be replaced by the nb5 module soon
- 8cb4f7b77 update module versions
- 55c6bca95 remove nb module profile
- f75eee5ac update metrics version to drop old slf4j dependency
- f379ad292 deactivate rest module with old dependencies
- f41250569 consolidate error tests
- fb8e27d33 sync stale versions on inactive modules
- 36fa9461d update and fill-in missing slf4j stub layer
- f38a635d7 Modified HDR Digits to 3 to save space on MetricsIntegrationTest.
- ffcc29414 Cleaned up "filename" configuration option from TCP. (Only available for Stdout)
- 45d00b187 Revert "NB5 address high priority code scans" (#895)
- 6146d186d Modified Writer Adapter Behavior for TCPServer to use poll and offer with a specified retry time (1s)
- 89c3bb453 Merge pull request #894 from nosqlbench/jeffb/code-scans-high
- d46ae1f12 Merge pull request #885 from nosqlbench/docfixes
- ccc7f7ab3 Cleaned up code after review.
- 23b79628b (origin/jeffb/code-scans-high) Narrowing fixes
- bdf61845f nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d
- 0139ea0ad fix: upgrade org.eclipse.jetty:jetty-servlets from 11.0.12 to 11.0.13 (#892)
- 277093d30 fix: upgrade org.eclipse.jetty:jetty-server from 11.0.12 to 11.0.13 (#891)
- f5ed9b011 fix: upgrade org.eclipse.jetty:jetty-servlet from 11.0.12 to 11.0.13 (#890)
- 9d577e669 fix: upgrade org.eclipse.jetty:jetty-rewrite from 11.0.12 to 11.0.13 (#889)
- 3fc6b195a fix: adapter-dynamodb/pom.xml to reduce vulnerabilities (#888)
- 19b8743f7 fix: upgrade com.github.oshi:oshi-core-java11 from 6.3.2 to 6.4.0 (#887)
- 38088659c remove previous "nb" module, which will be replaced by the nb5 module soon
- 7bcc7d6bf fix: upgrade joda-time:joda-time from 2.12.1 to 2.12.2 (#884)
- e4fd27649 (origin/docfixes, docfixes) use alternate inline binding form for zola
- 08fb6ca90 make anchor links zola compatible
- 972a6af65 remove duplicitous doc file
- 704e2751d Merge remote-tracking branch 'origin/main'
- c316bed26 support alternate inline format to work around docs shortcodes
- 52d5d10b8 Merge pull request #878 from nosqlbench/nosqlbench-875-categories
- dd1fc35d3 Merge pull request #880 from nosqlbench/nosqlbench-797-callable
- b6caecba4 Merge pull request #883 from nosqlbench/nosqlbench-881-fixtest
- 6731937c9 (nosqlbench-881-fixtest) modify logger calls for computable messages via closures
- a06cf565e enable async logging for tests
- 1af99a9b4 make logging-sensitive tests work with async logging
- 48055fe95 reduce logging output for integrated tests
- fd8e871da use local logger at debug level for any test output
- 78afbfbc5 Merge branch 'main' of github.com:nosqlbench/nosqlbench
- 10080720a (nosqlbench-797-callable) PR fixes from comments
- f8f7ca052 Merge pull request #872 from nosqlbench/nosqlbench-797-callable
- 54546ce9a (nosqlbench-875-categories) put required categories on all binding functions, not just one of each name
- b528a4e29 annotation processor asserts presence of categories on binding functions
- 74c234ddc Merge branch 'main' of github.com:nosqlbench/nosqlbench
- 301f1e2d2 added missing category for TriangularStepFunction
- 068c758bb Merge pull request #873 from nosqlbench/snyk-upgrade-e2924aae5a1f341565ee282682130a8f
- 77f31d193 fix: upgrade io.swagger.core.v3:swagger-models from 2.2.6 to 2.2.7
- 6da3798bb Update build.yml
- 1e0b18968 Merge pull request #871 from nosqlbench/snyk-upgrade-5b9126aa6ad92dc3e050d3528eccae75
- 6c8782842 change tests System.out calls to use logger directly
- 4f4b4982c templatize test logging level for logger setup and root logger
- d6c9f5153 test cleanup
- a6ff30d36 fix: upgrade io.dropwizard.metrics:metrics-graphite from 4.2.12 to 4.2.13
- d80632773 merge fixes for: improve concurrency patterns for activity execution
- b9365bff7 improve concurrency patterns for activity execution
- bf5a31b34 implement efficient concurrent thread state signaling
- 0de80887b move log4j.xml to log4j2.xml
- 077b41a3c misc naming, typo and formatting improvements
- 1816381f9 make integrated tests more robust
- 1416c71d9 internalize activity start logic
- c076b1079 make http extension docs distinct from http adapter docs
- 31dc3ce8f package housekeeping
- a1cde761f fix unapplied bug in diag space
- 3f14fefcb delegate activity instancing to dedicated loader
- ba42bfea4 allow conversion of missing config types
- 132192322 remove unused code paths to simplify refactoring
- bd129c442 run activities within a dedicated executor
- 09f776ad2 Merge pull request #870 from nosqlbench/stargate-v2-update
- 72dd19cb2 fixing operation syntax
- 2a62cc882 fix: upgrade com.github.docker-java:docker-java-transport-okhttp from 3.2.13 to 3.2.14 (#868)
- 7bc4265af fix: adapter-dynamodb/pom.xml to reduce vulnerabilities (#867)
- c4415e6c0 Merge pull request #863 from nosqlbench/my-NB5-I446
- 8fd13fcdd (origin/my-NB5-I446) added explicit cast to explicitly narrow precision for security purposes
- dc2174b95 fixed missing licenses
- e18dcbc22 Merge pull request #864 from nosqlbench/nosqlbench-861-dryrun
- fc7d72fac Merge pull request #866 from nosqlbench/nosqlbench-865-urlencode
- 3013dc315 fixed s4j.md to be compatible with export-docs
- 74bfbc691 Merge branch 'main' into my-NB5-I446
- 084dddf1d nosqlbench-865 Apply urlencoder logic in http for all op field forms
- d1d21c8e5 nosqlbench-861 Allow dry-run of op generation.
- 39a9f5e92 further cleanup of unused comments and commands
- b63eba35c fixed pulsar.md for export-docs after merge
- a06e1556f merged my-NB5-I446 with main
- f76c63b99 only launch build workflow when pushing to main branch
- 7eeb1d288 fixed unused dependency failure
- ad855e625 modified build workflow v2
- 99a4c9fb4 modified build workflow
- ec8ed024f Changed build workflow to re-add verify and run on non-release builds, removed unused dependency from nbr
- c149684d0 moved from commonmark to inhouse MutableMarkdown for verification
- bb28f0374 fix: mvn-defaults/pom.xml to reduce vulnerabilities (#859)
- 3891073ff fix: upgrade io.swagger.parser.v3:swagger-parser from 2.1.7 to 2.1.9 (#858)
- 3cd978950 fix: upgrade com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider from 2.14.0 to 2.14.1 (#857)
- 97bb2e7b9 fix: upgrade org.openjdk.jmh:jmh-core from 1.35 to 1.36 (#852)
- 60252ed2b fix: upgrade io.dropwizard.metrics:metrics-core from 4.2.12 to 4.2.13 (#851)
- 8545673f7 fix: upgrade com.github.oshi:oshi-core-java11 from 6.3.1 to 6.3.2 (#850)
- 7ba2a3de3 Moved CNAME to correct directory
- 04153d729 simplified staging command
- e29bfddc1 (origin/timeout_fix) avoid JVM timeout bug with lower default
- 9f3cae591 fix: upgrade com.amazonaws:aws-java-sdk-s3 from 1.12.330 to 1.12.347 (#849)
- 42d1403fb fix: nb-api/pom.xml to reduce vulnerabilities (#848)
- c514a62fa fix: upgrade org.openjdk.jmh:jmh-generator-annprocess from 1.35 to 1.36 (#853)
- 0c0156201 fix: adapter-dynamodb/pom.xml to reduce vulnerabilities (#855)
- bca55d594 Added status command for debug
- b1f0d590f Added hard overwrite for existing files
- d472b071d excluded _index.md from rsync dynamic update
- f0507cafb Merge pull request #854 from yabinmeng/main
- 37f889359 Fix Kafka producer transaction issue and add support for subscribing from multiple topics. NB yaml files and README update
- 3943a2945 Merge pull request #846 from nosqlbench/dependabot/maven/mvn-defaults/io.netty-netty-codec-haproxy-4.1.86.Final
- 17e4cb323 Merge pull request #847 from yabinmeng/main
- 1850a7c16 Add NB5 Kafka driver
- 1adb553b3 build(deps): bump netty-codec-haproxy in /mvn-defaults
- 2b39f3c35 Merge branch 'main' into nosqlbench-797-callable
- 21a33aec9 final version v1
- 5ccb06470 back to base simple case
- ee71bdd17 added checkout action v5
- 0c8ac02f3 added checkout action v4
- 2c61e9c52 added checkout action v3
- 875d0d960 Merge pull request #844 from nosqlbench/nosqlbench-843-cqlgen-mv
- 4383ba208 added checkout action v2
- 26e0b4cb0 added checkout action
- 4a178fc21 added git remote command to debug remote v2
- 43fa3e725 added git remote command to debug remote
- c0117351f Merge pull request #845 from nosqlbench/snyk-fix-b8424b271d02d50c5baa47f9e8745c03
- df534fd83 Merge branch 'main' into snyk-fix-b8424b271d02d50c5baa47f9e8745c03
- 5a47fded6 corrected workflow file for debug v2
- e09a93ea9 corrected workflow file for debug
- 0e9dff474 testing simple case for access debug
- f660c6272 converted to simple case for access debug
- 35f4bb982 (nosqlbench-843-cqlgen-mv) avoid NPE when cqlgen parses materialized views
- 21eb031da Merge pull request #837 from nosqlbench/nosqlbench-835-count-warning
- 8240844f6 Merge pull request #838 from nosqlbench/nosqlbench-836
- b3bcc3ce5 Merge pull request #839 from nosqlbench/nosqlbench-834-triangularstep
- 21bc389f2 Merge pull request #842 from nosqlbench/nosqlbench-841-hashedrange
- 8a48a020d (nosqlbench-841-hashedrange) allow zero width ranges
- 862058d80 (nosqlbench-834-triangularstep, nosqlbench-797) make TriangularStep ctor public
- 9d1e5c14d (nosqlbench-836) nosqlbench-836 expand default data retention in promql from 6 mo to 10 years
- 4b04cb3aa (nosqlbench-835-count-warning) move warning to ctor
- 457c75bc3 (nosqlbench-841-hashedrangesize) Merge pull request #832 from yabinmeng/main
- 2f5dd1f74 clean up
- bbbdb60dc added change directory before git command
- b9efec960 Modified git commands in build workflow for docs
- ae1a3af9c Corrected build workflow clone path
- ea46cbb74 removed debug commands and corrected build workflow clone path
- b77940715 Added debug commands for GHA
- 10d9ce314 Disabled verify for testing
- e7705d13d NB5-I446 Modified export-docs command to export organized driver and binding docs, also removed gendocs command (VirtDataGenDocsApp now used for generating bindings in StringBuilders to pass to the zipstream) added the export-docs build workflow job Updated build.yml to fix path to nb5 in workflow uploading/downloading docs artifacts in build workflow Attempt to correct the path to the exported docs to upload them Corrected exported-docs path Corrected yml build workflow error corrected naming of exported docs
- c08f8b6b3 1) Add NB5 S4J example yaml files 2) Update NB5 S4J readme file 3) Minor code adjustments
- 9a00af080 Merge branch 'nosqlbench:main' into main
- c102b213a github actions release test

View File

@@ -20,7 +20,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -38,7 +38,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -37,13 +37,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

View File

@@ -20,7 +20,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -39,7 +39,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -20,7 +20,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -38,7 +38,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -41,13 +41,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -36,7 +36,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -41,13 +41,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -39,13 +39,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.oss/pulsar-jms -->

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -38,14 +38,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -40,9 +40,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-stdout</artifactId>
<version>4.17.33-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,247 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
public class TcpAdapterSpace {
private final static Logger logger = LogManager.getLogger(TcpAdapterSpace.class);
private final String name;
private final NBConfiguration config;
public TcpAdapterSpace(String name, NBConfiguration config) {
this.name = name;
this.config = config;
}
protected PrintWriter createPrintWriter() {
SocketFactory socketFactory = SocketFactory.getDefault();
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
socketFactory = SSLKsFactory.get().createSocketFactory(sslCfg);
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(Integer.class, "port").orElse(12345);
try {
Socket socket = socketFactory.createSocket(host, port);
logger.info(() -> "connected to " + socket.toString());
return new PrintWriter(socket.getOutputStream());
} catch (IOException e) {
throw new RuntimeException("Error opening socket:" + e, e);
}
}
public void TCPServerActivity(ActivityDef activityDef) {
// super(activityDef);
// boolean sslEnabled = activityDef.getParams().getOptionalBoolean("ssl").orElse(false);
// this.capacity=activityDef.getParams().getOptionalInteger("capacity").orElse(10);
// queue = new LinkedBlockingQueue<>(capacity);
//
// if (sslEnabled) {
//
// NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(activityDef.getParams());
// socketFactory = SSLKsFactory.get().createSSLServerSocketFactory(sslCfg);
// } else {
// socketFactory = ServerSocketFactory.getDefault();
// }
}
// public void shutdownActivity() {
// super.shutdownActivity();
// for (TCPServerActivity.Shutdown toClose : managedShutdown) {
// toClose.shutdown();
// }
// }
// server write
// @Override
// public synchronized void write(String statement) {
// while (true) {
// try {
// queue.put(statement);
// return;
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// }
// create server writer
//
// @Override
// protected synchronized Writer createPrintWriter() {
//
// String host = getActivityDef().getParams().getOptionalString("host").orElse("localhost");
// int port = getActivityDef().getParams().getOptionalInteger("port").orElse(12345);
//
// if (listenerSocket == null || listenerSocket.isClosed()) {
// try {
// InetAddress hostAddr = InetAddress.getByName(host);
// listenerSocket = socketFactory.createServerSocket(port, 10, hostAddr);
// if (socketFactory instanceof SSLServerSocketFactory) {
// logger.info(() -> "SSL enabled on server socket " + listenerSocket);
// }
// TCPServerActivity.SocketAcceptor socketAcceptor = new TCPServerActivity.SocketAcceptor(queue, listenerSocket);
// managedShutdown.add(socketAcceptor);
// Thread acceptorThread = new Thread(socketAcceptor);
// acceptorThread.setDaemon(true);
// acceptorThread.setName("Listener/" + listenerSocket);
// acceptorThread.start();
// } catch (IOException e) {
// throw new RuntimeException("Error listening on listenerSocket:" + e, e);
// }
// }
//
// TCPServerActivity.QueueWriterAdapter queueWriterAdapter = new TCPServerActivity.QueueWriterAdapter(this.queue);
// logger.info(() -> "initialized queue writer:" + queueWriterAdapter);
// return queueWriterAdapter;
//
// }
// socket writer
// public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
// this.sourceQueue = sourceQueue;
// try {
// outputStream = connectedSocket.getOutputStream();
// this.writer = new OutputStreamWriter(outputStream);
// //connectedSocket.shutdownInput();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// server thread
// public void run() {
// try (Writer writer = this.writer) {
// while (true) {
// while (!sourceQueue.isEmpty() || running) {
// try {
// String data = sourceQueue.take();
// writer.write(data);
// writer.flush();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// try {
// Thread.sleep(10);
// } catch (InterruptedException ignored) {
// }
// }
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
//
// }
// server writer adapter
// public static class QueueWriterAdapter extends Writer {
// private BlockingQueue<String> queue;
//
// public QueueWriterAdapter(BlockingQueue<String> queue) {
// this.queue = queue;
// }
//
// @Override
// public synchronized void write( char[] cbuf, int off, int len) {
// while (true) {
// try {
// queue.put(new String(cbuf, off, len));
// return;
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// }
//
// @Override
// public synchronized void flush() throws IOException {
// }
//
// @Override
// public synchronized void close() throws IOException {
// flush();
// queue = null;
// }
//
// }
// server socket acceptor
// public class SocketAcceptor implements Runnable, TCPServerActivity.Shutdown {
// private final BlockingQueue<String> queue;
// private final ServerSocket serverSocket;
// private boolean running = true;
//
// public SocketAcceptor(BlockingQueue<String> queue, ServerSocket serverSocket) {
// this.queue = queue;
// this.serverSocket = serverSocket;
// }
//
// public void shutdown() {
// this.running = false;
// }
//
// @Override
// public void run() {
// try (ServerSocket serverSocket = this.serverSocket) {
// while (running) {
// serverSocket.setSoTimeout(1000);
// serverSocket.setReuseAddress(true);
// try {
// Socket connectedSocket = serverSocket.accept();
// TCPServerActivity.SocketWriter writer = new TCPServerActivity.SocketWriter(queue, connectedSocket);
// TCPServerActivity.this.managedShutdown.add(writer);
// Thread writerThread = new Thread(writer);
// writerThread.setName("SocketWriter/" + connectedSocket);
// writerThread.setDaemon(true);
// writerThread.start();
// logger.info(() -> "Started writer thread for " + connectedSocket);
// } catch (SocketTimeoutException ignored) {
// }
// }
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
// }
public static NBConfigModel getConfigModel() {
return ConfigModel.of(TcpAdapterSpace.class)
.asReadOnly();
}
}

View File

@@ -1,32 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
@Service(value= DriverAdapter.class,selector = "tcp")
public class TcpDriverAdapter extends BaseDriverAdapter<Op,TcpAdapterSpace> {
@Override
public OpMapper<Op> getOpMapper() {
return new TcpOpMapper(this);
}
}

View File

@@ -0,0 +1,109 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.adapter.tcpserver.TcpServerAdapterSpace;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.Socket;
public class TcpClientAdapterSpace {
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
public TcpClientAdapterSpace(NBConfiguration config) {
this.config = config;
this.writer = createPrintWriter();
}
protected PrintWriter createPrintWriter() {
SocketFactory socketFactory = SocketFactory.getDefault();
boolean sslEnabled = config.getOptional(boolean.class, "ssl").orElse(false);
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
socketFactory = SSLKsFactory.get().createSocketFactory(sslCfg);
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(int.class, "port").orElse(12345);
try {
Socket socket = socketFactory.createSocket(host, port);
logger.info("connected to " + socket.toString());
return new PrintWriter(socket.getOutputStream());
} catch (IOException e) {
throw new RuntimeException("Error opening socket:" + e, e);
}
}
public void writeflush(String text) {
try {
writer.write(text);
writer.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(TcpClientAdapterSpace.class)
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("host","localhost")
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
)
.add(
Param.defaultTo("newline",true)
.setDescription("whether to automatically add a missing newline to the end of any output\n")
)
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
.setDescription("""
Which format to use.
If provided, the format will override any statement formats provided by the YAML.
If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("""
This is a simple way to specify a filter for the names of bindings that you want to use.
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
)
.asReadOnly();
}
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.adapter.stdout.StdoutDriverAdapter;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
@Service(value= DriverAdapter.class,selector = "tcpclient")
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger(TcpClientDriverAdapter.class);
private final static StdoutDriverAdapter adap = new StdoutDriverAdapter();
@Override
public OpMapper<TcpClientOp> getOpMapper() {
DriverSpaceCache<? extends TcpClientAdapterSpace> ctxCache = getSpaceCache();
return new TcpClientOpMapper(this,ctxCache);
}
@Override
public Function<String, ? extends TcpClientAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new TcpClientAdapterSpace(cfg);
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(this.getClass())
.add(super.getConfigModel())
.add(TcpClientAdapterSpace.getConfigModel());
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
return adap.getSyntheticOpTemplates(stmtsDocList, cfg);
}
}

View File

@@ -14,20 +14,20 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.RunnableOp;
public class TcpOpDispenser extends BaseOpDispenser<Op,TcpAdapterSpace> {
public class TcpClientOp implements RunnableOp {
public TcpOpDispenser(TcpDriverAdapter adapter, ParsedOp op) {
super(adapter, op);
private final TcpClientAdapterSpace ctx;
private final String text;
public TcpClientOp(TcpClientAdapterSpace ctx, String text) {
this.ctx = ctx;
this.text = text;
}
@Override
public Op apply(long cycle) {
return new TcpOp(cycle);
public void run() {
ctx.writeflush(text);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClientAdapterSpace> {
private final LongFunction<TcpClientAdapterSpace> ctxFunction;
private final LongFunction<String> outFunction;
public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpClientAdapterSpace> ctxfunc) {
super(adapter,cmd);
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
this.outFunction = stringFunction;
}
@Override
public TcpClientOp apply(long value) {
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
String output = outFunction.apply(value);
return new TcpClientOp(ctx,output);
}
}

View File

@@ -14,23 +14,32 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
public class TcpOpMapper implements OpMapper<Op> {
import java.util.function.LongFunction;
private final TcpDriverAdapter adapter;
public class TcpClientOpMapper implements OpMapper<TcpClientOp> {
public TcpOpMapper(TcpDriverAdapter adapter) {
private final DriverSpaceCache<? extends TcpClientAdapterSpace> ctxcache;
private final TcpClientDriverAdapter adapter;
public TcpClientOpMapper(TcpClientDriverAdapter adapter, DriverSpaceCache<? extends TcpClientAdapterSpace> ctxcache) {
this.ctxcache = ctxcache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp op) {
return new TcpOpDispenser(adapter,op);
public OpDispenser<TcpClientOp> apply(ParsedOp op) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpClientAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
return new TcpClientOpDispenser(adapter,op,ctxfunc);
}
}

View File

@@ -0,0 +1,295 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.ServerSocket;
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
public class TcpServerAdapterSpace implements AutoCloseable{
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
private LinkedBlockingQueue<String> queue;
private ServerSocket listenerSocket;
private final List<Shutdown> managedShutdown = new ArrayList<>();
private int capacity=10;
public TcpServerAdapterSpace(NBConfiguration config) {
this.config = config;
this.writer = createPrintWriter();
}
private Writer createPrintWriter() {
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
this.capacity=config.getOptional(int.class, "capacity").orElse(10);
queue = new LinkedBlockingQueue<>(capacity);
ServerSocketFactory socketFactory;
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
socketFactory = SSLKsFactory.get().createSSLServerSocketFactory(sslCfg);
} else {
socketFactory = ServerSocketFactory.getDefault();
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(int.class, "port").orElse(12345);
if (listenerSocket == null || listenerSocket.isClosed()) {
try {
InetAddress hostAddr = InetAddress.getByName(host);
listenerSocket = socketFactory.createServerSocket(port, 10, hostAddr);
if (socketFactory instanceof SSLServerSocketFactory) {
logger.info("SSL enabled on server socket " + listenerSocket);
}
SocketAcceptor socketAcceptor = new SocketAcceptor(queue, listenerSocket);
managedShutdown.add(socketAcceptor);
Thread acceptorThread = new Thread(socketAcceptor);
acceptorThread.setDaemon(true);
acceptorThread.setName("Listener/" + listenerSocket);
acceptorThread.start();
} catch (IOException e) {
throw new RuntimeException("Error listening on listenerSocket:" + e, e);
}
}
QueueWriterAdapter queueWriterAdapter = new QueueWriterAdapter(this.queue);
logger.info("initialized queue writer:" + queueWriterAdapter);
return queueWriterAdapter;
}
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
while(this.queue != null && !this.queue.isEmpty())
{
try {
Thread.sleep(10);
}catch (InterruptedException e) {
}
}
logger.info("TcpServerAdapterSpace is being closed");
for (Shutdown toClose : managedShutdown) {
toClose.shutdown();
}
}
public void writeflush(String text) {
try {
if(this.writer == null)
{
this.writer = createPrintWriter();
}
this.writer.write(text);
this.writer.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(TcpServerAdapterSpace.class)
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("capacity",10)
.setDescription("the capacity of the queue")
)
.add(
Param.defaultTo("host","localhost")
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
)
.add(
Param.defaultTo("newline",true)
.setDescription("whether to automatically add a missing newline to the end of any output\n")
)
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
.setDescription("""
Which format to use.
"If provided, the format will override any statement formats provided by the YAML.
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("""
This is a simple way to specify a filter for the names of bindings that you want to use.
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
)
.asReadOnly();
}
private interface Shutdown {
void shutdown();
}
public static class SocketWriter implements Runnable, Shutdown {
private final BlockingQueue<String> sourceQueue;
private final Socket connectedSocket;
private boolean running = true;
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
this.sourceQueue = sourceQueue;
this.connectedSocket = connectedSocket;
}
public void shutdown() {
this.running = false;
}
@Override
public void run() {
OutputStream outputStream = null;
try {
outputStream = connectedSocket.getOutputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
try (Writer runWriter = new OutputStreamWriter(outputStream);) {
while (running ) {
if(!sourceQueue.isEmpty()) {
try {
//String data = sourceQueue.take();
String data = sourceQueue.poll(1, TimeUnit.SECONDS);
if(data == null) {
continue;
}
runWriter.write(data);
runWriter.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public static class QueueWriterAdapter extends Writer {
private BlockingQueue<String> queue;
private volatile boolean running = true;
public QueueWriterAdapter(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public synchronized void write( char[] cbuf, int off, int len) {
String message = new String(cbuf, off, len);
while (running) {
try {
if(queue.offer(message, 1, TimeUnit.SECONDS)) {
return;
}
} catch (InterruptedException ignored) {
logger.debug("QueueWriterAdapter was interrupted");
running =false;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public synchronized void flush() throws IOException {
}
@Override
public synchronized void close() throws IOException {
flush();
running =false;
queue = null;
}
}
public class SocketAcceptor implements Runnable, Shutdown {
private final BlockingQueue<String> queue;
private final ServerSocket serverSocket;
private boolean running = true;
public SocketAcceptor(BlockingQueue<String> queue, ServerSocket serverSocket) {
this.queue = queue;
this.serverSocket = serverSocket;
}
public void shutdown() {
this.running = false;
}
@Override
public void run() {
try (ServerSocket runServerSocket = this.serverSocket) {
while (running) {
runServerSocket.setSoTimeout(1000);
runServerSocket.setReuseAddress(true);
try {
Socket connectedSocket = runServerSocket.accept();
SocketWriter writer = new SocketWriter(queue, connectedSocket);
managedShutdown.add(writer);
Thread writerThread = new Thread(writer);
writerThread.setName("SocketWriter/" + connectedSocket);
writerThread.setDaemon(false);
writerThread.start();
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException e) {
logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.adapter.stdout.StdoutDriverAdapter;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
@Service(value= DriverAdapter.class, selector="tcpserver")
public class TcpServerDriverAdapter extends BaseDriverAdapter<TcpServerOp, TcpServerAdapterSpace> implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger(TcpServerDriverAdapter.class);
private final static StdoutDriverAdapter adap = new StdoutDriverAdapter();
@Override
public OpMapper<TcpServerOp> getOpMapper() {
DriverSpaceCache<? extends TcpServerAdapterSpace> ctxCache = getSpaceCache();
return new TcpServerOpMapper(this,ctxCache);
}
@Override
public Function<String, ? extends TcpServerAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new TcpServerAdapterSpace(cfg);
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(this.getClass())
.add(super.getConfigModel())
.add(TcpServerAdapterSpace.getConfigModel());
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
return adap.getSyntheticOpTemplates(stmtsDocList, cfg);
}
}

View File

@@ -14,11 +14,19 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.RunnableOp;
public class TcpOp implements Op {
public TcpOp(long cycle) {
public class TcpServerOp implements RunnableOp {
private final TcpServerAdapterSpace ctx;
private final String text;
public TcpServerOp(TcpServerAdapterSpace ctx, String text) {
this.ctx = ctx;
this.text = text;
}
public void run() {
ctx.writeflush(text);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerAdapterSpace> {
private final LongFunction<TcpServerAdapterSpace> ctxFunction;
private final LongFunction<String> outFunction;
public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpServerAdapterSpace> ctxfunc) {
super(adapter,cmd);
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
this.outFunction = stringFunction;
}
@Override
public TcpServerOp apply(long value) {
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
String output = outFunction.apply(value);
return new TcpServerOp(ctx,output);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpMapper implements OpMapper<TcpServerOp> {
private final DriverSpaceCache<? extends TcpServerAdapterSpace> ctxcache;
private final TcpServerDriverAdapter adapter;
public TcpServerOpMapper(TcpServerDriverAdapter adapter, DriverSpaceCache<? extends TcpServerAdapterSpace> ctxcache) {
this.ctxcache = ctxcache;
this.adapter = adapter;
}
@Override
public OpDispenser<TcpServerOp> apply(ParsedOp op) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpServerAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
return new TcpServerOpDispenser(adapter,op,ctxfunc);
}
}

View File

@@ -5,27 +5,20 @@
The tcpclient driver is based on the behavior of the stdout driver. You configure the tcpclient driver in exactly the
same way as the stdout driver, except for the additional parameters shown here.
The tcpclient driver connects to the configured server address and port (a socket address). When connected, it sends any
buffered lines of data to the server.
If the buffer is primed with data when the client is connected to a server, it will send all of the data at once. After
this, data is added to the buffer at whatever cyclerate the activity is configured for. If you add data to the buffer
faster than you can send it to a connected server, you will have a number of failed operations.
However, the opposite is not true. You should generally ensure that you can send the data as fast as you provide it, and
the error counts give you a relatively easy way to verify this. If you wish to disable this behavior, set the retries to
a very high value. In this case, the tries metric will still give you some measure of internal buffer saturation.
The tcpclient driver connects to a configured host and port (a socket address). When a server is listening on that socket,
then the data for each cycle is written via the socket just like stdout would write.
## Examples
Run a stdout activity named 'stdout-test', with definitions from activities/stdout-test.yaml
... driver=tcpserver yaml=stdout-test
... driver=tcpclient yaml=stdout-test
## Driver Parameters
- **retry_delay** - The internal retry frequency at which the internal cycle loop will attempt to add data to the
buffer. This applies when the internal buffer is full and the client isn't able to send data from it.
buffer. This applies when the internal buffer is full and no clients are consuming data from it.
- unit: milliseconds
- default: 1000
- dynamic: false
@@ -33,6 +26,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
failed.
- default: 3
- dynamic: false
- **ssl** - boolean to enable or disable ssl
- default: false
- dynamic: false
@@ -41,10 +35,10 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
[Additional parameters may need to be provided](../../../../driver-cql/src/main/resources/ssl.md).
- **host** - this is the name to connect to (remote server IP address)
- **host** - this is the name to bind to (local interface address)
- default: localhost
- dynamic: false
- **port** - this is the name of the port to connect to (remote server port)
- **port** - this is the name of the port to listen on
- default: 12345
- dynamic: false
- **capacity** - the size of the internal blocking queue

View File

@@ -9,6 +9,8 @@ The tcpserver driver listens on a configured host and port (a socket address). W
internal queue is buffered to them as long as there is data in it. For each cycle of data in the internal buffer, one of
the connected clients will get it in unspecified order.
The driver activity will block as long as there are still messages in the queue (Max:capacity). To ensure that the queue is empties and the activity shuts down correctly, one must make sure that a client connects to the server to receive the queued messages.
If the buffer is primed with data when a client is connected it will get all of the data at once. After this, data is
added to the buffer at whatever cyclerate the activity is configured for. If you add data to the buffer faster than you
can consume it with connected clients, you will have a number of failed operations.

View File

@@ -1,3 +0,0 @@
# tcp help topics
- tcpclient
- tcpserver

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -39,19 +39,19 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-spectest</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-userlibs</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>

View File

@@ -28,7 +28,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -37,7 +37,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
@@ -133,7 +133,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -1,49 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2022-2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.17.32-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-cockroachdb</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A CockroachDB ActivityType driver for http://nosqlbench.io/
</description>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-jdbc</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.1</version>
</dependency>
</dependencies>
</project>

View File

@@ -1,79 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.cockroachdb;
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.postgresql.ds.PGSimpleDataSource;
import javax.sql.DataSource;
import java.util.Arrays;
public class CockroachActivity extends JDBCActivity {
private static final Logger LOGGER = LogManager.getLogger(CockroachActivity.class);
public CockroachActivity(ActivityDef activityDef) {
super(activityDef);
}
// TODO provide an error handler with sane defaults including
// * retry on 40001 SQL state code (CockroachDB txn retry)
// * retry (implement exponential, to avoid stampeding herd) on timeout getting connection from connection pool
//
//@Override
//public NBErrorHandler getErrorHandler() {
//}
@Override
protected DataSource newDataSource() {
PGSimpleDataSource ds = new PGSimpleDataSource();
// serverName is required
String serverName = getParams().
getOptionalString("serverName").
orElseThrow(() -> new RuntimeException("serverName parameter required"));
// portNumber, databaseName, user, password are optional
Integer portNumber = getParams().getOptionalInteger("portNumber").orElse(26257);
String databaseName = getParams().getOptionalString("databaseName").orElse(null);
String user = getParams().getOptionalString("user").orElse(null);
String password = getParams().getOptionalString("password").orElse(null);
ds.setServerNames(new String[]{serverName});
ds.setPortNumbers(new int[]{portNumber});
if (databaseName != null) {
ds.setDatabaseName(databaseName);
}
if (user != null) {
ds.setUser(user);
}
if (password != null) {
ds.setPassword(password);
}
LOGGER.debug("Final DataSource fields:"
+ " serverNames=" + Arrays.toString(ds.getServerNames())
+ " portNumbers=" + Arrays.toString(ds.getPortNumbers())
+ " databaseName=" + ds.getDatabaseName()
+ " user=" + ds.getUser()
+ " password=" + ds.getPassword());
return ds;
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.cockroachdb;
import io.nosqlbench.activitytype.jdbc.api.JDBCActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class, selector = "cockroachdb")
public class CockroachActivityType implements ActivityType<CockroachActivity> {
@Override
public ActionDispenser getActionDispenser(CockroachActivity activity) {
return new JDBCActionDispenser(activity);
}
@Override
public CockroachActivity getActivity(ActivityDef activityDef) {
return new CockroachActivity(activityDef);
}
}

View File

@@ -1,52 +0,0 @@
min_version: "4.17.15"
description: An example of a basic cockroach insert
scenarios:
default:
main: |
run driver=cockroachdb tags==block:"main.*" threads=auto cycles===<<main-cycles:1000000>>
serverName=localhost connectionpool=hikari
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;.*40001:count,retry;stop
rampup: |
run driver=cockroachdb tags==block:rampup threads=auto cycles===<<rampup-cycles:1000000>>
serverName=localhost connectionpool=hikari
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;.*40001:count,retry;stop
schema: |
run driver=cockroachdb tags==block:schema threads===1 serverName=localhost
bindings:
seq_key: Mod(<<keyCount:1000000>>L); ToInt()
seq_value: Mod(<<valueCount:1000000000>>L); <<valueSizeDist:Hash()>>; ToString() -> String
rw_key: <<keyDist:Uniform(0,1000000)->long>>; ToInt()
rw_value: <<valDist:Uniform(0,1000000000)->int>>; <<valueSizeDist:Hash()>>; ToString() -> String
blocks:
schema:
ops:
create-database: |
CREATE DATABASE <<database:bank>>;
create-table: |
CREATE TABLE IF NOT EXISTS <<database:bank>>.<<table:banktransaction>> (
code STRING PRIMARY KEY,
amount INTEGER
);
rampup:
ops:
rampup-insert: |
INSERT INTO "<<database:bank>>"."<<table:banktransaction>>"
(code, amount) VALUES ('{seq_key}', {seq_value})
ON CONFLICT (code) DO NOTHING;
main-read:
params:
ratio: <<read_ratio:1>>
ops:
main-find: |
SELECT code, amount FROM "<<database:bank>>"."<<table:banktransaction>>"
WHERE code = '{rw_key}' AND amount = {rw_value};
main-write:
params:
ratio: <<write_ratio:1>>
ops:
main-insert: |
UPDATE "<<database:bank>>"."<<table:banktransaction>>" SET amount = {seq_value} WHERE code = '{seq_key}';

View File

@@ -1,66 +0,0 @@
min_version: "4.17.15"
description: An example of a basic postgres bank transaction workload
scenarios:
default:
main: |
run driver===cockroachdb tags===block:"main.*" threads=auto cycles=10000000
serverName=localhost portNumber=5432 databaseName=<<database:bank>> user=postgres
password=postgres connectionpool=hikari
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;stop
rampup: |
run driver===cockroachdb tags===block:rampup threads=auto cycles=<<accounts:1000000>>
serverName=localhost portNumber=5432 databaseName=<<database:bank>> user=postgres
password=postgres connectionpool=hikari filler-binding="AlphaNumericString(10)"
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;stop
rampup-large: |
run driver===cockroachdb tags===block:rampup threads=auto cycles=<<accounts:1000000>>
serverName=localhost portNumber=5432 databaseName=<<database:bank>> user=postgres
password=postgres connectionpool=hikari
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;stop
schema: |
run driver===cockroachdb tags===block:schema threads===1 serverName=localhost portNumber=5432
databaseName=bank user=postgres password=postgres
bindings:
seq_uuid: Mod(<<accounts:1000000>>L); ToHashedUUID()
rand_uuid: Uniform(0,<<accounts:1000000>>L); ToHashedUUID()
rand_amount: Poisson(2000000); ToInt()
timestamp: StartingEpochMillis('2018-01-01 00:00:00'); ToDateTime()
filler: <<filler-binding:HashedLineToStringList('data/lorem_ipsum_full.txt', 150, 150)>>; ToString()
blocks:
schema:
ops:
create-table: |
CREATE TABLE IF NOT EXISTS "<<table:account>>" (
uuid UUID PRIMARY KEY,
amount INTEGER,
amount_unit VARCHAR(64),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
filler TEXT
);
create-indices: |
CREATE INDEX IF NOT EXISTS amount_idx on "<<table:account>>" (amount);
CREATE INDEX IF NOT EXISTS updated_at_idx on "<<table:account>>" (updated_at);
rampup:
ops:
rampup-insert: |
INSERT INTO "<<table:account>>" (uuid, amount, amount_unit, updated_at, created_at, filler)
VALUES ('{seq_uuid}', {rand_amount}, 'us_cents', '{timestamp}', '{timestamp}', '{filler}')
ON CONFLICT DO NOTHING;
main-read:
params:
ratio: <<read_ratio:2>>
ops:
main-find: |
SELECT * FROM "<<table:account>>" WHERE uuid = '{rand_uuid}';
main-write:
params:
ratio: <<write_ratio:1>>
ops:
main-insert: |
UPDATE "<<table:account>>" SET amount = {rand_amount}, updated_at = '{timestamp}' WHERE uuid = '{rand_uuid}';

View File

@@ -1,50 +0,0 @@
# CockroachDB Driver
This is a driver for CockroachDB. It extends the generic JDBC Driver and
inherits its parameters.
### CockroachDB driver parameters
All parameters correspond to the postgresql JDBC library parameters. See
the
[DataSource Configuration Properties](https://jdbc.postgresql.org/documentation/81/ds-ds.html)
section for detailed parameter documentation.
* **serverName** (required) - database hostname.
* **databaseName** (optional) - database namespace to use; Default *null*.
* **portNumber** (optional) - database listen port; Default *26257*.
* **user** (optional) - database account username; Default *null*.
* **password** (optional) - database account password; Default *null*.
* **connectionpool** (optional) - connection pool implementation; Default
no connection pool, in other words create a connection per statement execution.
Allowed values:
* *hikari* -
use [HikariCP](https://github.com/brettwooldridge/HikariCP)
* **maxtries** (optional) - number of times to retry retry-able errors; Default *3*.
* **minretrydelayms** (optional) - minimum time in ms to wait before retry with exponential backoff; Default *200*.
* **errors** (optional) - see `error-handlers` topic for details (`./nb help error-handlers`). Default *stop*.
#### errors parameter
This parameter expects an expression which specifies how to handle exceptions by class name
and SQL state code. Error names are formatted as `<exception-name>_<sql-state>`.
For example, a *org.postgresql.util.PSQLException* with *SQLState=80001* will be formatted `PSQLException_80001`.
To continue on such an error, use `errors=PQLException_80001:warn,count;stop`. To retry any
*java.sql.SQLTransientException* or any *SQLState=80001* and otherwise stop, use
`errors=SQLTransientException.*:warn,count,retry;.*80001:warn,count,retry;stop`.
See scenario implementations in workloads `cockroachdb-basic` and `postgres-basic` for reasonable defaults
of the errors parameter. This is a reasonable default error handler chain:
1. `SQLTransient.*:warn,count,retry` - log, emit metric, and retry on transient errors
([java.sql doc](https://docs.oracle.com/javase/8/docs/api/java/sql/SQLTransientException.html))
2. `.*0800.*:warn,count,retry` - log, emit metric, and retry on "connection exception" class of postgresql driver
SQLState codes ([postgresql java doc](https://www.postgresql.org/docs/9.4/errcodes-appendix.html))
3. `.*40001:count,retry` - emit metric and retry on "serialization error" SQLState code of postgresql driver
([postgresql java doc](https://www.postgresql.org/docs/9.4/errcodes-appendix.html)).
These are common with CockroachDB
([doc](https://www.cockroachlabs.com/docs/stable/error-handling-and-troubleshooting.html#transaction-retry-errors)).
4. `stop` - stop the activity for any other error or if max retries are exceeded

View File

@@ -1,53 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.cockroachdb;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import org.junit.jupiter.api.Test;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class CockroachActivityTest {
@Test
public void testErrorNameMapper() {
ActivityDef activityDef = new ActivityDef(ParameterMap.parseParams("").orElseThrow());
CockroachActivity activity = new CockroachActivity(activityDef);
// When the Throwable is a SQLException, the error name should be getSQLState()
Throwable sqlException = new SQLException("my test exception", "my-test-sql-state");
assertEquals("SQLException_my-test-sql-state", activity.errorNameMapper(sqlException));
// See PSQLState to string code mapping at the Github source code website
// https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/util/PSQLState.java
Throwable psqlException = new PSQLException("retry transaction", PSQLState.CONNECTION_FAILURE);
assertEquals("PSQLException_08006", activity.errorNameMapper(psqlException));
// When SQLState is null or empty, suffix shouldn't be underscore
Throwable nullSQLState = new PSQLException("my test runtime exception", null);
assertEquals("PSQLException", activity.errorNameMapper(nullSQLState));
// When Throwable is not a SQLException, the error name should be the class name
Throwable runtimeException = new SocketTimeoutException("my test runtime exception");
assertEquals("SocketTimeoutException", activity.errorNameMapper(runtimeException));
}
}

View File

@@ -1,46 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2022-2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nosqlbench</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>driver-jdbc</artifactId>
<dependencies>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,34 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.jdbc.api;
import io.nosqlbench.activitytype.jdbc.impl.JDBCAction;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
public class JDBCActionDispenser implements ActionDispenser {
private final JDBCActivity activity;
public JDBCActionDispenser(JDBCActivity a) {
activity = a;
}
@Override
public Action getAction(int slot) {
return new JDBCAction(activity, slot);
}
}

View File

@@ -1,145 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.jdbc.api;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.nosqlbench.activitytype.jdbc.impl.ReadyJDBCOp;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.function.Function;
// This should not be exposed as as service directly unless it can
// be used with a modular JDBC configuration.
public abstract class JDBCActivity extends SimpleActivity {
private final static Logger LOGGER = LogManager.getLogger(JDBCActivity.class);
private Timer bindTimer;
private Timer resultTimer;
private Timer resultSuccessTimer;
private Histogram triesHisto;
private int maxTries;
private int minRetryDelayMs;
protected DataSource dataSource;
protected OpSequence<OpDispenser<? extends String>> opSequence;
public JDBCActivity(ActivityDef activityDef) {
super(activityDef);
}
/*
Subclasses construct a DataSource object. Concrete type should *not* be a pooled DataSource,
as this class implements wrapping with HikariDataSource if required.
*/
protected abstract DataSource newDataSource();
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
this.maxTries = getParams().getOptionalInteger("maxtries").orElse(3);
this.minRetryDelayMs = getParams().getOptionalInteger("minretrydelayms").orElse(200);
LOGGER.debug("initializing data source");
dataSource = newDataSource();
String connectionPool = getParams().getOptionalString("connectionpool").orElse("");
if (!connectionPool.isEmpty()) {
LOGGER.debug("initializing connectionpool " + connectionPool);
if (connectionPool.equals("hikari")) {
HikariConfig config = new HikariConfig();
config.setDataSource(dataSource);
dataSource = new HikariDataSource(config);
} else {
throw new RuntimeException("unknown connectionpool parameter value " + connectionPool);
}
}
}
@Override
public void initActivity() {
LOGGER.debug("initializing activity: " + getActivityDef().getAlias());
bindTimer = ActivityMetrics.timer(getActivityDef(), "bind", this.getHdrDigits());
resultTimer = ActivityMetrics.timer(getActivityDef(), "result", this.getHdrDigits());
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success", this.getHdrDigits());
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries", this.getHdrDigits());
opSequence = createOpSequence(ReadyJDBCOp::new,false);
setDefaultsFromOpSequence(opSequence);
onActivityDefUpdate(getActivityDef());
}
public String errorNameMapper(Throwable e) {
StringBuilder sb = new StringBuilder(e.getClass().getSimpleName());
if (e instanceof SQLException) {
String sqlState = ((SQLException) e).getSQLState();
if (sqlState != null && !sqlState.isEmpty()) {
sb.append('_');
sb.append(sqlState);
}
}
return sb.toString();
}
@Override
public Function<Throwable, String> getErrorNameMapper() {
return this::errorNameMapper;
}
public int getMaxTries() {
return this.maxTries;
}
public int getMinRetryDelayMs() {
return this.minRetryDelayMs;
}
public DataSource getDataSource() {
return dataSource;
}
public OpSequence<OpDispenser<? extends String>> getOpSequence() {
return opSequence;
}
public Timer getBindTimer() {
return bindTimer;
}
public Timer getResultTimer() {
return resultTimer;
}
public Timer getResultSuccessTimer() {
return resultSuccessTimer;
}
public Histogram getTriesHisto() {
return triesHisto;
}
}

View File

@@ -1,111 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.jdbc.impl;
import com.codahale.metrics.Timer;
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
public class JDBCAction implements SyncAction {
private static final Logger LOGGER = LogManager.getLogger(JDBCAction.class);
private final JDBCActivity activity;
private OpSequence<OpDispenser<? extends String>> sequencer;
public JDBCAction(JDBCActivity a, int slot) {
activity = a;
}
@Override
public void init() {
sequencer = activity.getOpSequence();
}
@Override
public int runCycle(long cycle) {
String boundStmt;
LongFunction<? extends String> unboundStmt = sequencer.apply(cycle);
try (Timer.Context bindTime = activity.getBindTimer().time()) {
boundStmt = unboundStmt.apply(cycle);
}
int maxTries = activity.getMaxTries();
Exception error = null;
for (int tries = 1; tries <= maxTries; tries++) {
long startTimeNanos = System.nanoTime();
try (Connection conn = activity.getDataSource().getConnection()) {
Statement jdbcStmt = conn.createStatement();
jdbcStmt.execute(boundStmt);
} catch (Exception e) {
error = e;
}
long executionTimeNanos = System.nanoTime() - startTimeNanos;
activity.getResultTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
activity.getTriesHisto().update(tries);
if (error == null) {
activity.getResultSuccessTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
return 0;
} else {
ErrorDetail detail = activity.getErrorHandler().handleError(error, cycle, executionTimeNanos);
if (!detail.isRetryable()) {
LOGGER.debug("Exit failure after non-retryable error");
throw new RuntimeException("non-retryable error", error);
}
}
try {
int retryDelay = retryDelayMs(tries, activity.getMinRetryDelayMs());
LOGGER.debug("tries=" + tries + " sleeping for " + retryDelay + " ms");
Thread.sleep(retryDelay);
} catch (InterruptedException e) {
throw new RuntimeException("thread interrupted", e);
}
}
LOGGER.debug("Exit failure after maxretries=" + maxTries);
throw new RuntimeException("maxtries exceeded", error);
}
/**
* Compute retry delay based on exponential backoff with full jitter
* @param tries 1-indexed
* @param minDelayMs lower bound of retry delay
* @return retry delay
*/
private int retryDelayMs(int tries, int minDelayMs) {
int exponentialDelay = minDelayMs * (int) Math.pow(2.0, tries - 1);
return (int) (Math.random() * exponentialDelay);
}
}

View File

@@ -1,42 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.activitytype.jdbc.impl;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
public class ReadyJDBCOp extends BaseOpDispenser<String> {
private final StringBindings bindings;
public ReadyJDBCOp(OpTemplate stmtDef) {
super(stmtDef);
ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt().orElseThrow(), stmtDef.getBindings());
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt().orElseThrow(), paramBindings);
bindings = template.resolve();
}
@Override
public String apply(long cycle) {
return bindings.bind(cycle);
}
}

View File

@@ -1,75 +0,0 @@
<!--
~ Copyright (c) 2022-2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-kafka</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A Kafka driver for nosqlbench. This provides the ability to inject synthetic data
into a kafka topic.
</description>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.1</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>

View File

@@ -1,50 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
public class KafkaAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(KafkaAction.class);
private final KafkaProducerActivity activity;
private final int slot;
private OpSequence<KafkaStatement> sequencer;
public KafkaAction(KafkaProducerActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
}
@Override
public void init() {
this.sequencer = activity.getOpSequencer();
}
@Override
public int runCycle(long cycle) {
sequencer.apply(cycle).write(cycle);
return 1;
}
}

View File

@@ -1,114 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.ebdrivers.kafkaproducer;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class KafkaProducerActivity extends SimpleActivity {
private final static Logger logger = LogManager.getLogger(KafkaProducerActivity.class);
private String yamlLoc;
private String clientId;
private String servers;
private OpSequence<KafkaStatement> opSequence;
private String schemaRegistryUrl;
Timer resultTimer;
Timer resultSuccessTimer;
public KafkaProducerActivity(ActivityDef activityDef) {
super(activityDef);
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
// sanity check
yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload")
.orElseThrow(() -> new IllegalArgumentException("yaml is not defined"));
servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
.orElse("localhost" + ":9092")
.split(","))
.map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
.collect(Collectors.joining(","));
clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
.orElse("TestProducerClientId");
schemaRegistryUrl = activityDef.getParams()
.getOptionalString("schema_registry_url", "schema.registry.url")
.orElse("http://localhost:8081");
}
@Override
public void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
onActivityDefUpdate(activityDef);
opSequence = initOpSequencer();
setDefaultsFromOpSequence(opSequence);
resultTimer = ActivityMetrics.timer(activityDef, "result", this.getHdrDigits());
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success", this.getHdrDigits());
}
private OpSequence<KafkaStatement> initOpSequencer() {
SequencerType sequencerType = SequencerType.valueOf(
getParams().getOptionalString("seq").orElse("bucket")
);
SequencePlanner<KafkaStatement> sequencer = new SequencePlanner<>(sequencerType);
String tagFilter = activityDef.getParams().getOptionalString("tags").orElse("");
StmtsDocList stmtsDocList = StatementsLoader.loadPath(logger, yamlLoc, activityDef.getParams(), "activities");
List<OpTemplate> statements = stmtsDocList.getStmts(tagFilter);
String format = getParams().getOptionalString("format").orElse(null);
if (statements.size() > 0) {
for (OpTemplate statement : statements) {
sequencer.addOp(
new KafkaStatement(statement,
servers,
clientId,
schemaRegistryUrl),
statement.getParamOrDefault("ratio", 1)
);
}
} else {
logger.error("Unable to create a Kafka statement if you have no active statements.");
}
return sequencer.resolve();
}
protected OpSequence<KafkaStatement> getOpSequencer() {
return opSequence;
}
}

View File

@@ -1,50 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class, selector = "kafkaproducer")
public class KafkaProducerActivityType implements ActivityType<KafkaProducerActivity> {
@Override
public KafkaProducerActivity getActivity(ActivityDef activityDef) {
return new KafkaProducerActivity(activityDef);
}
private static class Dispenser implements ActionDispenser {
private final KafkaProducerActivity activity;
private Dispenser(KafkaProducerActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new KafkaAction(this.activity, slot);
}
}
@Override
public ActionDispenser getActionDispenser(KafkaProducerActivity activity) {
return new Dispenser(activity);
}
}

View File

@@ -1,157 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.ebdrivers.kafkaproducer;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedStringTemplate;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaStatement {
private final static Logger logger = LogManager.getLogger(KafkaStatement.class);
private Producer<Object,Object> producer = null;
private final StringBindings bindings;
private final String topic;
private final String keySerializerClass;
private final String valueSerializerClass;
private AvroSchema keySerializerSchema = null;
private AvroSchema valueSerializerSchema = null;
private final String key;
public KafkaStatement(OpTemplate stmtDef, String servers, String clientId, String schemaRegistryUrl) {
ParsedStringTemplate paramTemplate = new ParsedStringTemplate(stmtDef.getStmt().orElseThrow(), stmtDef.getBindings());
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt().orElseThrow(), paramBindings);
this.bindings = template.resolve();
// Process key serializer class and schema, if any
this.keySerializerClass =
stmtDef.getOptionalStringParam("key_serializer_class")
.orElse(StringSerializer.class.getName());
Optional<String> keySerializerSchemaFile =
stmtDef.getOptionalStringParam("key_serializer_schema_file");
if (keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
&& keySerializerSchemaFile.isEmpty() ) {
throw new RuntimeException("KafkaAvroSerializer requires key_serializer_schema_file");
}
if (keySerializerSchemaFile.isPresent()) {
Path schemaFilePath = Path.of(keySerializerSchemaFile.get());
try {
this.keySerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
} catch (IOException e) {
throw new RuntimeException("Error reading key schema file: " + keySerializerSchemaFile, e);
}
}
// Process value serializer class and schema, if any
this.valueSerializerClass =
stmtDef.getOptionalStringParam("value_serializer_class")
.orElse(StringSerializer.class.getName());
Optional<String> valueSerializerSchemaFile =
stmtDef.getOptionalStringParam("value_serializer_schema_file");
if (valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
&& valueSerializerSchemaFile.isEmpty() ) {
throw new RuntimeException("KafkaAvroSerializer requires value_serializer_schema_file");
}
if (valueSerializerSchemaFile.isPresent()) {
Path schemaFilePath = Path.of(valueSerializerSchemaFile.get());
try {
this.valueSerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
} catch (IOException e) {
throw new RuntimeException("Error reading value schema file: " + valueSerializerSchemaFile, e);
}
}
this.topic = stmtDef.getParamOrDefault("topic","default-topic");
this.key = stmtDef.getOptionalStringParam("key").orElse("key");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
props.put("schema.registry.url", schemaRegistryUrl);
try {
producer = new KafkaProducer<>(props);
} catch (Exception e) {
logger.error("Error constructing kafka producer", e);
}
}
private Object bindKey(long cycle) {
Object statement = key;
if (keySerializerClass != null &&
keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
try {
statement = AvroSchemaUtils.toObject((String)statement, keySerializerSchema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return statement;
}
private Object bindValue(long cycle) {
Object statement = bindings.bind(cycle);
if (valueSerializerClass != null &&
valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
try {
statement = AvroSchemaUtils.toObject((String)statement, valueSerializerSchema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return statement;
}
public void write(long cycle) {
Object key = bindKey(cycle);
Object value = bindValue(cycle);
try {
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> send = producer.send(record);
RecordMetadata result = send.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,32 +0,0 @@
# kafkaproducer
This is an activity type which allows for a stream of data to be sent to a kafka topic. It is based on the stdout
activity statement format.
## Parameters
- **topic** - The topic to write to for this activity.
### Examples
Refer to the online standard YAML documentation for a detailed walk-through.
An example yaml is below for sending structured JSON to a kafka topic:
bindings:
price: Normal(10.0D,2.0D) -> double; Save('price') -> double;
quantity: Normal(10000.0D,100.0D); Add(-10000.0D); Save('quantity') -> double;
total: Identity(); Expr('price * quantity') -> double;
client: WeightedStrings('ABC_TEST:3;DFG_TEST:3;STG_TEST:14');
clientid: HashRange(0,1000000000) -> long;
statements:
- |
\{
"trade": \{
"price": {price},
"quantity": {quantity},
"total": {total},
"client": "{client}",
"clientid":"{clientid}"
\}
\}

View File

@@ -1,93 +0,0 @@
<!--
~ Copyright (c) 2022-2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-pulsar</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A Pulsar driver for nosqlbench. This provides the ability to inject synthetic data
into a pulsar system.
</description>
<properties>
<pulsar.version>2.10.2</pulsar.version>
</properties>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
</project>

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -39,31 +39,31 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-spectest</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-userlibs</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-core</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-docker</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -38,7 +38,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -56,7 +56,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -28,7 +28,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>docsys</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -22,7 +22,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -19,7 +19,7 @@
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -58,7 +58,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -160,10 +160,16 @@ public class NBConfiguration {
if (o==null) {
return Optional.empty();
}
if (type.isAssignableFrom(o.getClass())) {
if (type.isInstance(o)) {
return Optional.of((T) o);
} else if (type.isAssignableFrom(o.getClass())) {
return Optional.of((T)type.cast(o));
} else if (NBTypeConverter.canConvert(o, type)) {
return Optional.of((T) NBTypeConverter.convert(o, type));
} else {
throw new NBConfigError("config param " + Arrays.toString(names) +" was not assignable to class '" + type.getCanonicalName() + "'");
}
throw new NBConfigError("config param " + Arrays.toString(names) +" was not assignable to class '" + type.getCanonicalName() + "'");
}
public <T> T getOrDefault(String name, T defaultValue) {

View File

@@ -20,7 +20,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -40,7 +40,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nbr</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
@@ -55,37 +55,37 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-tcp</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-mongodb</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-stdout</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-diag</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-dynamodb</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-cqld4</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
@@ -97,13 +97,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-http</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-pulsar</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
@@ -115,13 +115,19 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-s4j</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-tcp</artifactId>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-kafka</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
@@ -208,7 +214,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-mongodb</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>

View File

@@ -20,7 +20,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -39,13 +39,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nbr</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-diag</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -21,7 +21,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -40,31 +40,31 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-docs</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-core</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-extensions</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-diag</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -30,7 +30,7 @@ public class MetricsIntegrationTest {
@Test
public void testHistogramLogger() {
ActivityDef ad = ActivityDef.parseActivityDef("alias=foo;driver=diag;op=noop");
Histogram testhistogram = ActivityMetrics.histogram(ad, "testhistogram", 4);
Histogram testhistogram = ActivityMetrics.histogram(ad, "testhistogram", 3);
ActivityMetrics.addHistoLogger("testsession", ".*","testhisto.log","1s");
testhistogram.update(400);
testhistogram.getSnapshot();

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>mvn-defaults</relativePath>
</parent>

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -23,14 +23,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<artifactId>nb-api</artifactId>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lang</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>

View File

@@ -23,7 +23,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -20,7 +20,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -20,13 +20,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -20,7 +20,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -18,7 +18,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -20,7 +20,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@@ -34,42 +34,42 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-realdata</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-realer</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-random</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-curves4</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>docsys</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.33-SNAPSHOT</version>
<scope>compile</scope>
</dependency>