minor refinements to kafka producer

This commit is contained in:
Jonathan Shook 2020-05-27 13:30:49 -05:00
parent 4b84a31520
commit f22407460d
8 changed files with 233 additions and 1 deletions
driver-http
driver-kafka
pom.xml
src/main
java/com/datastax/ebdrivers/kafkaproducer
resources
engine-docs/src/main/resources/docs-for-nb/designing_workloads
nb
pom.xml

View File

@ -18,6 +18,7 @@
</description>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>

85
driver-kafka/pom.xml Normal file
View File

@ -0,0 +1,85 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.115-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-kafka</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A Kafka driver for nosqlbench. This provides the ability to inject synthetic data
into a kafka topic.
</description>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.115-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>3.12.115-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- test only scope -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core-java8</artifactId>
<version>1.0.0m1</version>
<scope>test</scope>
</dependency>
</dependencies>
<!-- <profiles>-->
<!-- <profile>-->
<!-- <id>shade</id>-->
<!-- <activation>-->
<!-- <activeByDefault>true</activeByDefault>-->
<!-- </activation>-->
<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <finalName>${project.artifactId}</finalName>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
<!-- </profile>-->
<!-- </profiles>-->
</project>

View File

@ -0,0 +1,68 @@
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.activitytype.stdout.StdoutActivity;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class KafkaProducerActivity extends StdoutActivity {
private final static Logger logger = LoggerFactory.getLogger(KafkaProducerActivity.class);
private Producer<Long,String> producer = null;
private String topic;
public KafkaProducerActivity(ActivityDef activityDef) {
super(activityDef);
}
public synchronized Producer<Long,String> getKafkaProducer() {
if (producer!=null) {
return producer;
}
Properties props = new Properties();
String servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
.orElse("localhost" + ":9092")
.split(","))
.map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
.collect(Collectors.joining(","));
String clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
.orElse("TestProducerClientId");
String key_serializer =
activityDef.getParams().getOptionalString("key_serializer").orElse(LongSerializer.class.getName());
String value_serializer =
activityDef.getParams().getOptionalString("value_serializer").orElse(StringSerializer.class.getName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, key_serializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value_serializer);
producer = new KafkaProducer<>(props);
return producer;
}
@Override
public synchronized void write(String statement) {
Producer<Long, String> kafkaProducer = getKafkaProducer();
ProducerRecord<Long, String> record = new ProducerRecord<>(topic, statement);
Future<RecordMetadata> send = kafkaProducer.send(record);
try {
RecordMetadata result = send.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
this.topic = activityDef.getParams().getOptionalString("topic").orElse("default-topic");
super.onActivityDefUpdate(activityDef);
}
}

View File

@ -0,0 +1,40 @@
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.activitytype.stdout.StdoutAction;
import io.nosqlbench.activitytype.stdout.StdoutActivity;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(ActivityType.class)
public class KafkaProducerActivityType implements ActivityType<KafkaProducerActivity> {
@Override
public String getName() {
return "kafkaproducer";
}
@Override
public KafkaProducerActivity getActivity(ActivityDef activityDef) {
return new KafkaProducerActivity(activityDef);
}
private static class Dispenser implements ActionDispenser {
private StdoutActivity activity;
private Dispenser(StdoutActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new StdoutAction(slot,this.activity);
}
}
@Override
public ActionDispenser getActionDispenser(KafkaProducerActivity activity) {
return new Dispenser(activity);
}
}

View File

@ -0,0 +1,32 @@
# kafkaproducer
This is an activity type which allows for a stream of data to be sent to a kafka topic. It is based on the stdout
activity statement format.
## Parameters
- **topic** - The topic to write to for this activity.
### Examples
Refer to the online standard YAML documentation for a detailed walk-through.
An example yaml is below for sending structured JSON to a kafka topic:
bindings:
price: Normal(10.0D,2.0D) -> double; Save('price') -> double;
quantity: Normal(10000.0D,100.0D); Add(-10000.0D); Save('quantity') -> double;
total: Identity(); Expr('price * quantity') -> double;
client: WeightedStrings('ABC_TEST:3;DFG_TEST:3;STG_TEST:14');
clientid: HashRange(0,1000000000) -> long;
statements:
- |
\{
"trade": \{
"price": {price},
"quantity": {quantity},
"total": {total},
"client": "{client}",
"clientid":"{clientid}"
\}
\}

View File

@ -224,7 +224,7 @@ as any other parameter depending on the assignment operators as explained above.
### alias
The `alias` parameter is, by default, set to the expanded name of WORKLOAD_SCENARIO_STEP, which means that each activity
within the scenario has a distinct and symoblic name. This is important for distinguishing metrics from one another
within the scenario has a distinct and symbolic name. This is important for distinguishing metrics from one another
across workloads, named scenarios, and steps within a named scenario. The above words are interpolated into the alias as
follows:

View File

@ -27,6 +27,11 @@
<version>3.12.115-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-kafka</artifactId>
<version>3.12.115-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>

View File

@ -43,6 +43,7 @@
<module>driver-cql-shaded</module>
<module>driver-cqlverify</module>
<module>driver-web</module>
<module>driver-kafka</module>
<!-- VIRTDATA MODULES -->