mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-16 17:34:52 -06:00
removed stale modules
This commit is contained in:
parent
bca21e86b8
commit
0eb71686a4
@ -1,49 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright (c) 2022-2023 nosqlbench
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>mvn-defaults</artifactId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
<relativePath>../mvn-defaults</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>driver-cockroachdb</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
|
||||
<description>
|
||||
A CockroachDB ActivityType driver for http://nosqlbench.io/
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>driver-jdbc</artifactId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<version>42.5.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.cockroachdb;
|
||||
|
||||
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.postgresql.ds.PGSimpleDataSource;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class CockroachActivity extends JDBCActivity {
|
||||
private static final Logger LOGGER = LogManager.getLogger(CockroachActivity.class);
|
||||
|
||||
public CockroachActivity(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
}
|
||||
|
||||
// TODO provide an error handler with sane defaults including
|
||||
// * retry on 40001 SQL state code (CockroachDB txn retry)
|
||||
// * retry (implement exponential, to avoid stampeding herd) on timeout getting connection from connection pool
|
||||
//
|
||||
//@Override
|
||||
//public NBErrorHandler getErrorHandler() {
|
||||
//}
|
||||
|
||||
@Override
|
||||
protected DataSource newDataSource() {
|
||||
PGSimpleDataSource ds = new PGSimpleDataSource();
|
||||
|
||||
// serverName is required
|
||||
String serverName = getParams().
|
||||
getOptionalString("serverName").
|
||||
orElseThrow(() -> new RuntimeException("serverName parameter required"));
|
||||
|
||||
// portNumber, databaseName, user, password are optional
|
||||
Integer portNumber = getParams().getOptionalInteger("portNumber").orElse(26257);
|
||||
String databaseName = getParams().getOptionalString("databaseName").orElse(null);
|
||||
String user = getParams().getOptionalString("user").orElse(null);
|
||||
String password = getParams().getOptionalString("password").orElse(null);
|
||||
|
||||
ds.setServerNames(new String[]{serverName});
|
||||
ds.setPortNumbers(new int[]{portNumber});
|
||||
if (databaseName != null) {
|
||||
ds.setDatabaseName(databaseName);
|
||||
}
|
||||
if (user != null) {
|
||||
ds.setUser(user);
|
||||
}
|
||||
if (password != null) {
|
||||
ds.setPassword(password);
|
||||
}
|
||||
|
||||
LOGGER.debug("Final DataSource fields:"
|
||||
+ " serverNames=" + Arrays.toString(ds.getServerNames())
|
||||
+ " portNumbers=" + Arrays.toString(ds.getPortNumbers())
|
||||
+ " databaseName=" + ds.getDatabaseName()
|
||||
+ " user=" + ds.getUser()
|
||||
+ " password=" + ds.getPassword());
|
||||
|
||||
return ds;
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.cockroachdb;
|
||||
|
||||
import io.nosqlbench.activitytype.jdbc.api.JDBCActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
|
||||
@Service(value = ActivityType.class, selector = "cockroachdb")
|
||||
public class CockroachActivityType implements ActivityType<CockroachActivity> {
|
||||
|
||||
@Override
|
||||
public ActionDispenser getActionDispenser(CockroachActivity activity) {
|
||||
return new JDBCActionDispenser(activity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CockroachActivity getActivity(ActivityDef activityDef) {
|
||||
return new CockroachActivity(activityDef);
|
||||
}
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
min_version: "4.17.15"
|
||||
|
||||
description: An example of a basic cockroach insert
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
main: |
|
||||
run driver=cockroachdb tags==block:"main.*" threads=auto cycles===<<main-cycles:1000000>>
|
||||
serverName=localhost connectionpool=hikari
|
||||
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;.*40001:count,retry;stop
|
||||
rampup: |
|
||||
run driver=cockroachdb tags==block:rampup threads=auto cycles===<<rampup-cycles:1000000>>
|
||||
serverName=localhost connectionpool=hikari
|
||||
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;.*40001:count,retry;stop
|
||||
schema: |
|
||||
run driver=cockroachdb tags==block:schema threads===1 serverName=localhost
|
||||
|
||||
bindings:
|
||||
seq_key: Mod(<<keyCount:1000000>>L); ToInt()
|
||||
seq_value: Mod(<<valueCount:1000000000>>L); <<valueSizeDist:Hash()>>; ToString() -> String
|
||||
rw_key: <<keyDist:Uniform(0,1000000)->long>>; ToInt()
|
||||
rw_value: <<valDist:Uniform(0,1000000000)->int>>; <<valueSizeDist:Hash()>>; ToString() -> String
|
||||
|
||||
blocks:
|
||||
schema:
|
||||
ops:
|
||||
create-database: |
|
||||
CREATE DATABASE <<database:bank>>;
|
||||
create-table: |
|
||||
CREATE TABLE IF NOT EXISTS <<database:bank>>.<<table:banktransaction>> (
|
||||
code STRING PRIMARY KEY,
|
||||
amount INTEGER
|
||||
);
|
||||
rampup:
|
||||
ops:
|
||||
rampup-insert: |
|
||||
INSERT INTO "<<database:bank>>"."<<table:banktransaction>>"
|
||||
(code, amount) VALUES ('{seq_key}', {seq_value})
|
||||
ON CONFLICT (code) DO NOTHING;
|
||||
main-read:
|
||||
params:
|
||||
ratio: <<read_ratio:1>>
|
||||
ops:
|
||||
main-find: |
|
||||
SELECT code, amount FROM "<<database:bank>>"."<<table:banktransaction>>"
|
||||
WHERE code = '{rw_key}' AND amount = {rw_value};
|
||||
main-write:
|
||||
params:
|
||||
ratio: <<write_ratio:1>>
|
||||
ops:
|
||||
main-insert: |
|
||||
UPDATE "<<database:bank>>"."<<table:banktransaction>>" SET amount = {seq_value} WHERE code = '{seq_key}';
|
@ -1,66 +0,0 @@
|
||||
min_version: "4.17.15"
|
||||
|
||||
description: An example of a basic postgres bank transaction workload
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
main: |
|
||||
run driver===cockroachdb tags===block:"main.*" threads=auto cycles=10000000
|
||||
serverName=localhost portNumber=5432 databaseName=<<database:bank>> user=postgres
|
||||
password=postgres connectionpool=hikari
|
||||
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;stop
|
||||
rampup: |
|
||||
run driver===cockroachdb tags===block:rampup threads=auto cycles=<<accounts:1000000>>
|
||||
serverName=localhost portNumber=5432 databaseName=<<database:bank>> user=postgres
|
||||
password=postgres connectionpool=hikari filler-binding="AlphaNumericString(10)"
|
||||
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;stop
|
||||
rampup-large: |
|
||||
run driver===cockroachdb tags===block:rampup threads=auto cycles=<<accounts:1000000>>
|
||||
serverName=localhost portNumber=5432 databaseName=<<database:bank>> user=postgres
|
||||
password=postgres connectionpool=hikari
|
||||
errors=SQLTransient.*:warn,count,retry;.*0800.*:warn,count,retry;stop
|
||||
schema: |
|
||||
run driver===cockroachdb tags===block:schema threads===1 serverName=localhost portNumber=5432
|
||||
databaseName=bank user=postgres password=postgres
|
||||
|
||||
bindings:
|
||||
seq_uuid: Mod(<<accounts:1000000>>L); ToHashedUUID()
|
||||
rand_uuid: Uniform(0,<<accounts:1000000>>L); ToHashedUUID()
|
||||
rand_amount: Poisson(2000000); ToInt()
|
||||
timestamp: StartingEpochMillis('2018-01-01 00:00:00'); ToDateTime()
|
||||
filler: <<filler-binding:HashedLineToStringList('data/lorem_ipsum_full.txt', 150, 150)>>; ToString()
|
||||
|
||||
blocks:
|
||||
schema:
|
||||
ops:
|
||||
create-table: |
|
||||
CREATE TABLE IF NOT EXISTS "<<table:account>>" (
|
||||
uuid UUID PRIMARY KEY,
|
||||
amount INTEGER,
|
||||
amount_unit VARCHAR(64),
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
filler TEXT
|
||||
);
|
||||
create-indices: |
|
||||
CREATE INDEX IF NOT EXISTS amount_idx on "<<table:account>>" (amount);
|
||||
CREATE INDEX IF NOT EXISTS updated_at_idx on "<<table:account>>" (updated_at);
|
||||
|
||||
rampup:
|
||||
ops:
|
||||
rampup-insert: |
|
||||
INSERT INTO "<<table:account>>" (uuid, amount, amount_unit, updated_at, created_at, filler)
|
||||
VALUES ('{seq_uuid}', {rand_amount}, 'us_cents', '{timestamp}', '{timestamp}', '{filler}')
|
||||
ON CONFLICT DO NOTHING;
|
||||
main-read:
|
||||
params:
|
||||
ratio: <<read_ratio:2>>
|
||||
ops:
|
||||
main-find: |
|
||||
SELECT * FROM "<<table:account>>" WHERE uuid = '{rand_uuid}';
|
||||
main-write:
|
||||
params:
|
||||
ratio: <<write_ratio:1>>
|
||||
ops:
|
||||
main-insert: |
|
||||
UPDATE "<<table:account>>" SET amount = {rand_amount}, updated_at = '{timestamp}' WHERE uuid = '{rand_uuid}';
|
@ -1,50 +0,0 @@
|
||||
# CockroachDB Driver
|
||||
|
||||
This is a driver for CockroachDB. It extends the generic JDBC Driver and
|
||||
inherits its parameters.
|
||||
|
||||
### CockroachDB driver parameters
|
||||
|
||||
All parameters correspond to the postgresql JDBC library parameters. See
|
||||
the
|
||||
[DataSource Configuration Properties](https://jdbc.postgresql.org/documentation/81/ds-ds.html)
|
||||
section for detailed parameter documentation.
|
||||
|
||||
* **serverName** (required) - database hostname.
|
||||
* **databaseName** (optional) - database namespace to use; Default *null*.
|
||||
* **portNumber** (optional) - database listen port; Default *26257*.
|
||||
* **user** (optional) - database account username; Default *null*.
|
||||
* **password** (optional) - database account password; Default *null*.
|
||||
* **connectionpool** (optional) - connection pool implementation; Default
|
||||
no connection pool, in other words create a connection per statement execution.
|
||||
Allowed values:
|
||||
* *hikari* -
|
||||
use [HikariCP](https://github.com/brettwooldridge/HikariCP)
|
||||
* **maxtries** (optional) - number of times to retry retry-able errors; Default *3*.
|
||||
* **minretrydelayms** (optional) - minimum time in ms to wait before retry with exponential backoff; Default *200*.
|
||||
* **errors** (optional) - see `error-handlers` topic for details (`./nb help error-handlers`). Default *stop*.
|
||||
|
||||
#### errors parameter
|
||||
|
||||
This parameter expects an expression which specifies how to handle exceptions by class name
|
||||
and SQL state code. Error names are formatted as `<exception-name>_<sql-state>`.
|
||||
|
||||
For example, a *org.postgresql.util.PSQLException* with *SQLState=80001* will be formatted `PSQLException_80001`.
|
||||
To continue on such an error, use `errors=PQLException_80001:warn,count;stop`. To retry any
|
||||
*java.sql.SQLTransientException* or any *SQLState=80001* and otherwise stop, use
|
||||
`errors=SQLTransientException.*:warn,count,retry;.*80001:warn,count,retry;stop`.
|
||||
|
||||
See scenario implementations in workloads `cockroachdb-basic` and `postgres-basic` for reasonable defaults
|
||||
of the errors parameter. This is a reasonable default error handler chain:
|
||||
|
||||
1. `SQLTransient.*:warn,count,retry` - log, emit metric, and retry on transient errors
|
||||
([java.sql doc](https://docs.oracle.com/javase/8/docs/api/java/sql/SQLTransientException.html))
|
||||
2. `.*0800.*:warn,count,retry` - log, emit metric, and retry on "connection exception" class of postgresql driver
|
||||
SQLState codes ([postgresql java doc](https://www.postgresql.org/docs/9.4/errcodes-appendix.html))
|
||||
3. `.*40001:count,retry` - emit metric and retry on "serialization error" SQLState code of postgresql driver
|
||||
([postgresql java doc](https://www.postgresql.org/docs/9.4/errcodes-appendix.html)).
|
||||
These are common with CockroachDB
|
||||
([doc](https://www.cockroachlabs.com/docs/stable/error-handling-and-troubleshooting.html#transaction-retry-errors)).
|
||||
4. `stop` - stop the activity for any other error or if max retries are exceeded
|
||||
|
||||
|
@ -1,53 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.cockroachdb;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.postgresql.util.PSQLException;
|
||||
import org.postgresql.util.PSQLState;
|
||||
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class CockroachActivityTest {
|
||||
@Test
|
||||
public void testErrorNameMapper() {
|
||||
ActivityDef activityDef = new ActivityDef(ParameterMap.parseParams("").orElseThrow());
|
||||
CockroachActivity activity = new CockroachActivity(activityDef);
|
||||
|
||||
// When the Throwable is a SQLException, the error name should be getSQLState()
|
||||
Throwable sqlException = new SQLException("my test exception", "my-test-sql-state");
|
||||
assertEquals("SQLException_my-test-sql-state", activity.errorNameMapper(sqlException));
|
||||
|
||||
// See PSQLState to string code mapping at the Github source code website
|
||||
// https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/util/PSQLState.java
|
||||
Throwable psqlException = new PSQLException("retry transaction", PSQLState.CONNECTION_FAILURE);
|
||||
assertEquals("PSQLException_08006", activity.errorNameMapper(psqlException));
|
||||
|
||||
// When SQLState is null or empty, suffix shouldn't be underscore
|
||||
Throwable nullSQLState = new PSQLException("my test runtime exception", null);
|
||||
assertEquals("PSQLException", activity.errorNameMapper(nullSQLState));
|
||||
|
||||
// When Throwable is not a SQLException, the error name should be the class name
|
||||
Throwable runtimeException = new SocketTimeoutException("my test runtime exception");
|
||||
assertEquals("SocketTimeoutException", activity.errorNameMapper(runtimeException));
|
||||
}
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright (c) 2022-2023 nosqlbench
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nosqlbench</artifactId>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>driver-jdbc</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.zaxxer</groupId>
|
||||
<artifactId>HikariCP</artifactId>
|
||||
<version>3.4.5</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>engine-api</artifactId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
|
||||
</project>
|
@ -1,34 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.jdbc.api;
|
||||
|
||||
import io.nosqlbench.activitytype.jdbc.impl.JDBCAction;
|
||||
import io.nosqlbench.engine.api.activityapi.core.Action;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
|
||||
public class JDBCActionDispenser implements ActionDispenser {
|
||||
private final JDBCActivity activity;
|
||||
|
||||
public JDBCActionDispenser(JDBCActivity a) {
|
||||
activity = a;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Action getAction(int slot) {
|
||||
return new JDBCAction(activity, slot);
|
||||
}
|
||||
}
|
@ -1,145 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.jdbc.api;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import io.nosqlbench.activitytype.jdbc.impl.ReadyJDBCOp;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.SQLException;
|
||||
import java.util.function.Function;
|
||||
|
||||
// This should not be exposed as as service directly unless it can
|
||||
// be used with a modular JDBC configuration.
|
||||
public abstract class JDBCActivity extends SimpleActivity {
|
||||
private final static Logger LOGGER = LogManager.getLogger(JDBCActivity.class);
|
||||
private Timer bindTimer;
|
||||
private Timer resultTimer;
|
||||
private Timer resultSuccessTimer;
|
||||
private Histogram triesHisto;
|
||||
private int maxTries;
|
||||
private int minRetryDelayMs;
|
||||
|
||||
protected DataSource dataSource;
|
||||
protected OpSequence<OpDispenser<? extends String>> opSequence;
|
||||
|
||||
public JDBCActivity(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
}
|
||||
|
||||
/*
|
||||
Subclasses construct a DataSource object. Concrete type should *not* be a pooled DataSource,
|
||||
as this class implements wrapping with HikariDataSource if required.
|
||||
*/
|
||||
protected abstract DataSource newDataSource();
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
|
||||
this.maxTries = getParams().getOptionalInteger("maxtries").orElse(3);
|
||||
this.minRetryDelayMs = getParams().getOptionalInteger("minretrydelayms").orElse(200);
|
||||
|
||||
LOGGER.debug("initializing data source");
|
||||
dataSource = newDataSource();
|
||||
|
||||
String connectionPool = getParams().getOptionalString("connectionpool").orElse("");
|
||||
if (!connectionPool.isEmpty()) {
|
||||
LOGGER.debug("initializing connectionpool " + connectionPool);
|
||||
if (connectionPool.equals("hikari")) {
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setDataSource(dataSource);
|
||||
dataSource = new HikariDataSource(config);
|
||||
} else {
|
||||
throw new RuntimeException("unknown connectionpool parameter value " + connectionPool);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initActivity() {
|
||||
LOGGER.debug("initializing activity: " + getActivityDef().getAlias());
|
||||
bindTimer = ActivityMetrics.timer(getActivityDef(), "bind", this.getHdrDigits());
|
||||
resultTimer = ActivityMetrics.timer(getActivityDef(), "result", this.getHdrDigits());
|
||||
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success", this.getHdrDigits());
|
||||
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries", this.getHdrDigits());
|
||||
|
||||
opSequence = createOpSequence(ReadyJDBCOp::new,false);
|
||||
setDefaultsFromOpSequence(opSequence);
|
||||
|
||||
onActivityDefUpdate(getActivityDef());
|
||||
}
|
||||
|
||||
public String errorNameMapper(Throwable e) {
|
||||
StringBuilder sb = new StringBuilder(e.getClass().getSimpleName());
|
||||
if (e instanceof SQLException) {
|
||||
String sqlState = ((SQLException) e).getSQLState();
|
||||
if (sqlState != null && !sqlState.isEmpty()) {
|
||||
sb.append('_');
|
||||
sb.append(sqlState);
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<Throwable, String> getErrorNameMapper() {
|
||||
return this::errorNameMapper;
|
||||
}
|
||||
|
||||
public int getMaxTries() {
|
||||
return this.maxTries;
|
||||
}
|
||||
|
||||
public int getMinRetryDelayMs() {
|
||||
return this.minRetryDelayMs;
|
||||
}
|
||||
|
||||
public DataSource getDataSource() {
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public OpSequence<OpDispenser<? extends String>> getOpSequence() {
|
||||
return opSequence;
|
||||
}
|
||||
|
||||
public Timer getBindTimer() {
|
||||
return bindTimer;
|
||||
}
|
||||
|
||||
public Timer getResultTimer() {
|
||||
return resultTimer;
|
||||
}
|
||||
|
||||
public Timer getResultSuccessTimer() {
|
||||
return resultSuccessTimer;
|
||||
}
|
||||
|
||||
public Histogram getTriesHisto() {
|
||||
return triesHisto;
|
||||
}
|
||||
}
|
@ -1,111 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.jdbc.impl;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.Statement;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class JDBCAction implements SyncAction {
|
||||
private static final Logger LOGGER = LogManager.getLogger(JDBCAction.class);
|
||||
|
||||
private final JDBCActivity activity;
|
||||
private OpSequence<OpDispenser<? extends String>> sequencer;
|
||||
|
||||
public JDBCAction(JDBCActivity a, int slot) {
|
||||
activity = a;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
sequencer = activity.getOpSequence();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycle) {
|
||||
String boundStmt;
|
||||
|
||||
LongFunction<? extends String> unboundStmt = sequencer.apply(cycle);
|
||||
|
||||
try (Timer.Context bindTime = activity.getBindTimer().time()) {
|
||||
boundStmt = unboundStmt.apply(cycle);
|
||||
}
|
||||
|
||||
int maxTries = activity.getMaxTries();
|
||||
Exception error = null;
|
||||
|
||||
for (int tries = 1; tries <= maxTries; tries++) {
|
||||
long startTimeNanos = System.nanoTime();
|
||||
|
||||
try (Connection conn = activity.getDataSource().getConnection()) {
|
||||
Statement jdbcStmt = conn.createStatement();
|
||||
jdbcStmt.execute(boundStmt);
|
||||
|
||||
} catch (Exception e) {
|
||||
error = e;
|
||||
}
|
||||
|
||||
long executionTimeNanos = System.nanoTime() - startTimeNanos;
|
||||
|
||||
activity.getResultTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
|
||||
activity.getTriesHisto().update(tries);
|
||||
|
||||
if (error == null) {
|
||||
activity.getResultSuccessTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
|
||||
return 0;
|
||||
} else {
|
||||
ErrorDetail detail = activity.getErrorHandler().handleError(error, cycle, executionTimeNanos);
|
||||
if (!detail.isRetryable()) {
|
||||
LOGGER.debug("Exit failure after non-retryable error");
|
||||
throw new RuntimeException("non-retryable error", error);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
int retryDelay = retryDelayMs(tries, activity.getMinRetryDelayMs());
|
||||
LOGGER.debug("tries=" + tries + " sleeping for " + retryDelay + " ms");
|
||||
Thread.sleep(retryDelay);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("thread interrupted", e);
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.debug("Exit failure after maxretries=" + maxTries);
|
||||
throw new RuntimeException("maxtries exceeded", error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute retry delay based on exponential backoff with full jitter
|
||||
* @param tries 1-indexed
|
||||
* @param minDelayMs lower bound of retry delay
|
||||
* @return retry delay
|
||||
*/
|
||||
private int retryDelayMs(int tries, int minDelayMs) {
|
||||
int exponentialDelay = minDelayMs * (int) Math.pow(2.0, tries - 1);
|
||||
return (int) (Math.random() * exponentialDelay);
|
||||
}
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.activitytype.jdbc.impl;
|
||||
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
|
||||
import io.nosqlbench.virtdata.core.templates.StringBindings;
|
||||
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
|
||||
|
||||
public class ReadyJDBCOp extends BaseOpDispenser<String> {
|
||||
private final StringBindings bindings;
|
||||
|
||||
public ReadyJDBCOp(OpTemplate stmtDef) {
|
||||
super(stmtDef);
|
||||
ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt().orElseThrow(), stmtDef.getBindings());
|
||||
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
|
||||
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt().orElseThrow(), paramBindings);
|
||||
|
||||
bindings = template.resolve();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String apply(long cycle) {
|
||||
return bindings.bind(cycle);
|
||||
}
|
||||
}
|
@ -1,75 +0,0 @@
|
||||
<!--
|
||||
~ Copyright (c) 2022-2023 nosqlbench
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>mvn-defaults</artifactId>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
<relativePath>../mvn-defaults</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>driver-kafka</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
|
||||
<description>
|
||||
A Kafka driver for nosqlbench. This provides the ability to inject synthetic data
|
||||
into a kafka topic.
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- core dependencies -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.8.2</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>1.11.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>7.2.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>engine-api</artifactId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>https://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
</project>
|
@ -1,50 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.datastax.ebdrivers.kafkaproducer;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
|
||||
public class KafkaAction implements SyncAction {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(KafkaAction.class);
|
||||
|
||||
private final KafkaProducerActivity activity;
|
||||
private final int slot;
|
||||
|
||||
private OpSequence<KafkaStatement> sequencer;
|
||||
|
||||
public KafkaAction(KafkaProducerActivity activity, int slot) {
|
||||
this.activity = activity;
|
||||
this.slot = slot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.sequencer = activity.getOpSequencer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycle) {
|
||||
sequencer.apply(cycle).write(cycle);
|
||||
return 1;
|
||||
}
|
||||
|
||||
}
|
@ -1,114 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.datastax.ebdrivers.kafkaproducer;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
|
||||
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
|
||||
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class KafkaProducerActivity extends SimpleActivity {
|
||||
private final static Logger logger = LogManager.getLogger(KafkaProducerActivity.class);
|
||||
private String yamlLoc;
|
||||
private String clientId;
|
||||
private String servers;
|
||||
private OpSequence<KafkaStatement> opSequence;
|
||||
private String schemaRegistryUrl;
|
||||
Timer resultTimer;
|
||||
Timer resultSuccessTimer;
|
||||
|
||||
|
||||
public KafkaProducerActivity(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
|
||||
// sanity check
|
||||
yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload")
|
||||
.orElseThrow(() -> new IllegalArgumentException("yaml is not defined"));
|
||||
servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
|
||||
.orElse("localhost" + ":9092")
|
||||
.split(","))
|
||||
.map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
|
||||
.collect(Collectors.joining(","));
|
||||
clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
|
||||
.orElse("TestProducerClientId");
|
||||
schemaRegistryUrl = activityDef.getParams()
|
||||
.getOptionalString("schema_registry_url", "schema.registry.url")
|
||||
.orElse("http://localhost:8081");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initActivity() {
|
||||
logger.debug("initializing activity: " + this.activityDef.getAlias());
|
||||
onActivityDefUpdate(activityDef);
|
||||
|
||||
opSequence = initOpSequencer();
|
||||
setDefaultsFromOpSequence(opSequence);
|
||||
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result", this.getHdrDigits());
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success", this.getHdrDigits());
|
||||
}
|
||||
|
||||
private OpSequence<KafkaStatement> initOpSequencer() {
|
||||
SequencerType sequencerType = SequencerType.valueOf(
|
||||
getParams().getOptionalString("seq").orElse("bucket")
|
||||
);
|
||||
SequencePlanner<KafkaStatement> sequencer = new SequencePlanner<>(sequencerType);
|
||||
|
||||
String tagFilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
StmtsDocList stmtsDocList = StatementsLoader.loadPath(logger, yamlLoc, activityDef.getParams(), "activities");
|
||||
List<OpTemplate> statements = stmtsDocList.getStmts(tagFilter);
|
||||
|
||||
String format = getParams().getOptionalString("format").orElse(null);
|
||||
|
||||
if (statements.size() > 0) {
|
||||
for (OpTemplate statement : statements) {
|
||||
sequencer.addOp(
|
||||
new KafkaStatement(statement,
|
||||
servers,
|
||||
clientId,
|
||||
schemaRegistryUrl),
|
||||
statement.getParamOrDefault("ratio", 1)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
logger.error("Unable to create a Kafka statement if you have no active statements.");
|
||||
}
|
||||
|
||||
return sequencer.resolve();
|
||||
}
|
||||
|
||||
protected OpSequence<KafkaStatement> getOpSequencer() {
|
||||
return opSequence;
|
||||
}
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.datastax.ebdrivers.kafkaproducer;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.Action;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
|
||||
@Service(value = ActivityType.class, selector = "kafkaproducer")
|
||||
public class KafkaProducerActivityType implements ActivityType<KafkaProducerActivity> {
|
||||
|
||||
@Override
|
||||
public KafkaProducerActivity getActivity(ActivityDef activityDef) {
|
||||
return new KafkaProducerActivity(activityDef);
|
||||
}
|
||||
|
||||
private static class Dispenser implements ActionDispenser {
|
||||
private final KafkaProducerActivity activity;
|
||||
|
||||
private Dispenser(KafkaProducerActivity activity) {
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Action getAction(int slot) {
|
||||
return new KafkaAction(this.activity, slot);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionDispenser getActionDispenser(KafkaProducerActivity activity) {
|
||||
return new Dispenser(activity);
|
||||
}
|
||||
}
|
@ -1,157 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.datastax.ebdrivers.kafkaproducer;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedStringTemplate;
|
||||
import io.nosqlbench.virtdata.core.templates.StringBindings;
|
||||
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class KafkaStatement {
|
||||
private final static Logger logger = LogManager.getLogger(KafkaStatement.class);
|
||||
|
||||
private Producer<Object,Object> producer = null;
|
||||
private final StringBindings bindings;
|
||||
private final String topic;
|
||||
private final String keySerializerClass;
|
||||
private final String valueSerializerClass;
|
||||
private AvroSchema keySerializerSchema = null;
|
||||
private AvroSchema valueSerializerSchema = null;
|
||||
private final String key;
|
||||
|
||||
public KafkaStatement(OpTemplate stmtDef, String servers, String clientId, String schemaRegistryUrl) {
|
||||
ParsedStringTemplate paramTemplate = new ParsedStringTemplate(stmtDef.getStmt().orElseThrow(), stmtDef.getBindings());
|
||||
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
|
||||
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt().orElseThrow(), paramBindings);
|
||||
|
||||
this.bindings = template.resolve();
|
||||
|
||||
// Process key serializer class and schema, if any
|
||||
this.keySerializerClass =
|
||||
stmtDef.getOptionalStringParam("key_serializer_class")
|
||||
.orElse(StringSerializer.class.getName());
|
||||
|
||||
Optional<String> keySerializerSchemaFile =
|
||||
stmtDef.getOptionalStringParam("key_serializer_schema_file");
|
||||
|
||||
if (keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
|
||||
&& keySerializerSchemaFile.isEmpty() ) {
|
||||
throw new RuntimeException("KafkaAvroSerializer requires key_serializer_schema_file");
|
||||
}
|
||||
|
||||
if (keySerializerSchemaFile.isPresent()) {
|
||||
Path schemaFilePath = Path.of(keySerializerSchemaFile.get());
|
||||
try {
|
||||
this.keySerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Error reading key schema file: " + keySerializerSchemaFile, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Process value serializer class and schema, if any
|
||||
this.valueSerializerClass =
|
||||
stmtDef.getOptionalStringParam("value_serializer_class")
|
||||
.orElse(StringSerializer.class.getName());
|
||||
|
||||
Optional<String> valueSerializerSchemaFile =
|
||||
stmtDef.getOptionalStringParam("value_serializer_schema_file");
|
||||
|
||||
if (valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
|
||||
&& valueSerializerSchemaFile.isEmpty() ) {
|
||||
throw new RuntimeException("KafkaAvroSerializer requires value_serializer_schema_file");
|
||||
}
|
||||
|
||||
if (valueSerializerSchemaFile.isPresent()) {
|
||||
Path schemaFilePath = Path.of(valueSerializerSchemaFile.get());
|
||||
try {
|
||||
this.valueSerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Error reading value schema file: " + valueSerializerSchemaFile, e);
|
||||
}
|
||||
}
|
||||
|
||||
this.topic = stmtDef.getParamOrDefault("topic","default-topic");
|
||||
this.key = stmtDef.getOptionalStringParam("key").orElse("key");
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
|
||||
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
|
||||
props.put("schema.registry.url", schemaRegistryUrl);
|
||||
|
||||
try {
|
||||
producer = new KafkaProducer<>(props);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error constructing kafka producer", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Object bindKey(long cycle) {
|
||||
Object statement = key;
|
||||
if (keySerializerClass != null &&
|
||||
keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
|
||||
try {
|
||||
statement = AvroSchemaUtils.toObject((String)statement, keySerializerSchema);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
return statement;
|
||||
}
|
||||
|
||||
private Object bindValue(long cycle) {
|
||||
Object statement = bindings.bind(cycle);
|
||||
if (valueSerializerClass != null &&
|
||||
valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
|
||||
try {
|
||||
statement = AvroSchemaUtils.toObject((String)statement, valueSerializerSchema);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
return statement;
|
||||
}
|
||||
|
||||
public void write(long cycle) {
|
||||
Object key = bindKey(cycle);
|
||||
Object value = bindValue(cycle);
|
||||
try {
|
||||
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, key, value);
|
||||
Future<RecordMetadata> send = producer.send(record);
|
||||
RecordMetadata result = send.get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
# kafkaproducer
|
||||
|
||||
This is an activity type which allows for a stream of data to be sent to a kafka topic. It is based on the stdout
|
||||
activity statement format.
|
||||
|
||||
## Parameters
|
||||
|
||||
- **topic** - The topic to write to for this activity.
|
||||
|
||||
### Examples
|
||||
|
||||
Refer to the online standard YAML documentation for a detailed walk-through.
|
||||
An example yaml is below for sending structured JSON to a kafka topic:
|
||||
|
||||
bindings:
|
||||
price: Normal(10.0D,2.0D) -> double; Save('price') -> double;
|
||||
quantity: Normal(10000.0D,100.0D); Add(-10000.0D); Save('quantity') -> double;
|
||||
total: Identity(); Expr('price * quantity') -> double;
|
||||
client: WeightedStrings('ABC_TEST:3;DFG_TEST:3;STG_TEST:14');
|
||||
clientid: HashRange(0,1000000000) -> long;
|
||||
|
||||
statements:
|
||||
- |
|
||||
\{
|
||||
"trade": \{
|
||||
"price": {price},
|
||||
"quantity": {quantity},
|
||||
"total": {total},
|
||||
"client": "{client}",
|
||||
"clientid":"{clientid}"
|
||||
\}
|
||||
\}
|
@ -1,93 +0,0 @@
|
||||
<!--
|
||||
~ Copyright (c) 2022-2023 nosqlbench
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>mvn-defaults</artifactId>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
<relativePath>../mvn-defaults</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>driver-pulsar</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
|
||||
<description>
|
||||
A Pulsar driver for nosqlbench. This provides the ability to inject synthetic data
|
||||
into a pulsar system.
|
||||
</description>
|
||||
|
||||
<properties>
|
||||
<pulsar.version>2.10.2</pulsar.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- core dependencies -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.pulsar</groupId>
|
||||
<artifactId>pulsar-client</artifactId>
|
||||
<version>${pulsar.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.pulsar</groupId>
|
||||
<artifactId>pulsar-client-admin</artifactId>
|
||||
<version>${pulsar.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>engine-api</artifactId>
|
||||
<version>4.17.32-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
<version>1.9.4</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-configuration2</artifactId>
|
||||
<version>2.8.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>1.11.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.12.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
Loading…
Reference in New Issue
Block a user