diff --git a/driver-kafka/pom.xml b/driver-kafka/pom.xml
index 016af05f3..08f53869f 100644
--- a/driver-kafka/pom.xml
+++ b/driver-kafka/pom.xml
@@ -27,6 +27,20 @@
2.0.0
+
+
+ org.apache.avro
+ avro
+ 1.10.0
+
+
+
+
+ io.confluent
+ kafka-avro-serializer
+ 5.5.1
+
+
io.nosqlbench
engine-api
@@ -46,7 +60,17 @@
-
+
+
+ confluent
+ Confluent Maven Repo
+ default
+ https://packages.confluent.io/maven/
+
+ false
+
+
+
diff --git a/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaAction.java b/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaAction.java
new file mode 100644
index 000000000..f01b3e82b
--- /dev/null
+++ b/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaAction.java
@@ -0,0 +1,34 @@
+package com.datastax.ebdrivers.kafkaproducer;
+
+import io.nosqlbench.engine.api.activityapi.core.SyncAction;
+import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaAction implements SyncAction {
+
+ private final static Logger logger = LoggerFactory.getLogger(KafkaAction.class);
+
+ private final KafkaProducerActivity activity;
+ private final int slot;
+
+ private OpSequence sequencer;
+
+ public KafkaAction(KafkaProducerActivity activity, int slot) {
+ this.activity = activity;
+ this.slot = slot;
+ }
+
+ @Override
+ public void init() {
+ this.sequencer = activity.getOpSequencer();
+ }
+
+ @Override
+ public int runCycle(long cycleValue) {
+ sequencer.get(cycleValue).write(cycleValue);
+ return 1;
+ }
+
+}
diff --git a/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivity.java b/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivity.java
index 907fa786a..30189c531 100644
--- a/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivity.java
+++ b/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivity.java
@@ -1,68 +1,108 @@
package com.datastax.ebdrivers.kafkaproducer;
-import io.nosqlbench.activitytype.stdout.StdoutActivity;
+import com.codahale.metrics.Timer;
+import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
+import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
+import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
+import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
+import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
+import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
+import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
+import io.nosqlbench.engine.api.metrics.ActivityMetrics;
+import io.nosqlbench.engine.api.templating.StrInterpolator;
+import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
+import io.nosqlbench.virtdata.core.templates.StringBindings;
+import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
import org.apache.kafka.clients.producer.*;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
-public class KafkaProducerActivity extends StdoutActivity {
+public class KafkaProducerActivity extends SimpleActivity {
private final static Logger logger = LoggerFactory.getLogger(KafkaProducerActivity.class);
- private Producer producer = null;
- private String topic;
+ private String yamlLoc;
+ private String clientId;
+ private String servers;
+ private OpSequence opSequence;
+ private String schemaRegistryUrl;
+ Timer resultTimer;
+ Timer resultSuccessTimer;
+
public KafkaProducerActivity(ActivityDef activityDef) {
super(activityDef);
}
- public synchronized Producer getKafkaProducer() {
- if (producer!=null) {
- return producer;
- }
- Properties props = new Properties();
- String servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
- .orElse("localhost" + ":9092")
- .split(","))
- .map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
- .collect(Collectors.joining(","));
- String clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
- .orElse("TestProducerClientId");
- String key_serializer =
- activityDef.getParams().getOptionalString("key_serializer").orElse(LongSerializer.class.getName());
- String value_serializer =
- activityDef.getParams().getOptionalString("value_serializer").orElse(StringSerializer.class.getName());
-
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, key_serializer);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value_serializer);
-
- producer = new KafkaProducer<>(props);
- return producer;
- }
-
@Override
- public synchronized void write(String statement) {
- Producer kafkaProducer = getKafkaProducer();
- ProducerRecord record = new ProducerRecord<>(topic, statement);
- Future send = kafkaProducer.send(record);
- try {
- RecordMetadata result = send.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onActivityDefUpdate(ActivityDef activityDef) {
- this.topic = activityDef.getParams().getOptionalString("topic").orElse("default-topic");
+ public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
+
+ // sanity check
+ yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload")
+ .orElseThrow(() -> new IllegalArgumentException("yaml is not defined"));
+ servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
+ .orElse("localhost" + ":9092")
+ .split(","))
+ .map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
+ .collect(Collectors.joining(","));
+ clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
+ .orElse("TestProducerClientId");
+ schemaRegistryUrl = activityDef.getParams()
+ .getOptionalString("schema_registry_url", "schema.registry.url")
+ .orElse("http://localhost:8081");
+ }
+
+ @Override
+ public void initActivity() {
+ logger.debug("initializing activity: " + this.activityDef.getAlias());
+ onActivityDefUpdate(activityDef);
+
+ opSequence = initOpSequencer();
+ setDefaultsFromOpSequence(opSequence);
+
+ resultTimer = ActivityMetrics.timer(activityDef, "result");
+ resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
+ }
+
+ private OpSequence initOpSequencer() {
+ SequencerType sequencerType = SequencerType.valueOf(
+ getParams().getOptionalString("seq").orElse("bucket")
+ );
+ SequencePlanner sequencer = new SequencePlanner<>(sequencerType);
+
+ String tagFilter = activityDef.getParams().getOptionalString("tags").orElse("");
+ StmtsDocList stmtsDocList = StatementsLoader.loadPath(logger, yamlLoc, new StrInterpolator(activityDef),
+ "activities");
+ List statements = stmtsDocList.getStmts(tagFilter);
+
+ String format = getParams().getOptionalString("format").orElse(null);
+
+ if (statements.size() > 0) {
+ for (OpTemplate statement : statements) {
+ sequencer.addOp(
+ new KafkaStatement(statement,
+ servers,
+ clientId,
+ schemaRegistryUrl),
+ statement.getParamOrDefault("ratio", 1)
+ );
+ }
+ } else {
+ logger.error("Unable to create a Kafka statement if you have no active statements.");
+ }
+
+ return sequencer.resolve();
+ }
+
+ protected OpSequence getOpSequencer() {
+ return opSequence;
}
}
diff --git a/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivityType.java b/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivityType.java
index ad62a29a4..b7a6a12bc 100644
--- a/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivityType.java
+++ b/driver-kafka/src/main/java/com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivityType.java
@@ -1,7 +1,5 @@
package com.datastax.ebdrivers.kafkaproducer;
-import io.nosqlbench.activitytype.stdout.StdoutAction;
-import io.nosqlbench.activitytype.stdout.StdoutActivity;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
@@ -21,15 +19,15 @@ public class KafkaProducerActivityType implements ActivityType producer = null;
+ private final StringBindings bindings;
+ private final String topic;
+ private final String keySerializerClass;
+ private final String valueSerializerClass;
+ private AvroSchema keySerializerSchema = null;
+ private AvroSchema valueSerializerSchema = null;
+ private final String key;
+
+ public KafkaStatement(OpTemplate stmtDef, String servers, String clientId, String schemaRegistryUrl) {
+ ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt(), stmtDef.getBindings());
+ BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
+ StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt(), paramBindings);
+
+ this.bindings = template.resolve();
+
+ // Process key serializer class and schema, if any
+ this.keySerializerClass =
+ stmtDef.getOptionalStringParam("key_serializer_class")
+ .orElse(StringSerializer.class.getName());
+
+ Optional keySerializerSchemaFile =
+ stmtDef.getOptionalStringParam("key_serializer_schema_file");
+
+ if (keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
+ && keySerializerSchemaFile.isEmpty() ) {
+ throw new RuntimeException("KafkaAvroSerializer requires key_serializer_schema_file");
+ }
+
+ if (keySerializerSchemaFile.isPresent()) {
+ Path schemaFilePath = Path.of(keySerializerSchemaFile.get());
+ try {
+ this.keySerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading key schema file: " + keySerializerSchemaFile, e);
+ }
+ }
+
+ // Process value serializer class and schema, if any
+ this.valueSerializerClass =
+ stmtDef.getOptionalStringParam("value_serializer_class")
+ .orElse(StringSerializer.class.getName());
+
+ Optional valueSerializerSchemaFile =
+ stmtDef.getOptionalStringParam("value_serializer_schema_file");
+
+ if (valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
+ && valueSerializerSchemaFile.isEmpty() ) {
+ throw new RuntimeException("KafkaAvroSerializer requires value_serializer_schema_file");
+ }
+
+ if (valueSerializerSchemaFile.isPresent()) {
+ Path schemaFilePath = Path.of(valueSerializerSchemaFile.get());
+ try {
+ this.valueSerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading value schema file: " + valueSerializerSchemaFile, e);
+ }
+ }
+
+ this.topic = stmtDef.getParamOrDefault("topic","default-topic");
+ this.key = stmtDef.getOptionalStringParam("key").orElse("key");
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ props.put("schema.registry.url", schemaRegistryUrl);
+
+ try {
+ producer = new KafkaProducer<>(props);
+ } catch (Exception e) {
+ logger.error("Error constructing kafka producer", e);
+ }
+ }
+
+ private Object bindKey(long cycle) {
+ Object statement = key;
+ if (keySerializerClass != null &&
+ keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
+ try {
+ statement = AvroSchemaUtils.toObject((String)statement, keySerializerSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return statement;
+ }
+
+ private Object bindValue(long cycle) {
+ Object statement = bindings.bind(cycle);
+ if (valueSerializerClass != null &&
+ valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
+ try {
+ statement = AvroSchemaUtils.toObject((String)statement, valueSerializerSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return statement;
+ }
+
+ public void write(long cycle) {
+ Object key = bindKey(cycle);
+ Object value = bindValue(cycle);
+ try {
+ ProducerRecord