Merge pull request #208 from ncarvind/avro

Changes for dealing with avro serialization of kafka messages
This commit is contained in:
Jonathan Shook 2020-09-23 20:49:33 -05:00 committed by GitHub
commit 22ac6cf6c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 290 additions and 53 deletions

View File

@ -27,6 +27,20 @@
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
@ -46,7 +60,17 @@
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<name>Confluent Maven Repo</name>
<layout>default</layout>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<!-- <profiles>-->
<!-- <profile>-->
<!-- <id>shade</id>-->

View File

@ -0,0 +1,34 @@
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaAction implements SyncAction {
private final static Logger logger = LoggerFactory.getLogger(KafkaAction.class);
private final KafkaProducerActivity activity;
private final int slot;
private OpSequence<KafkaStatement> sequencer;
public KafkaAction(KafkaProducerActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
}
@Override
public void init() {
this.sequencer = activity.getOpSequencer();
}
@Override
public int runCycle(long cycleValue) {
sequencer.get(cycleValue).write(cycleValue);
return 1;
}
}

View File

@ -1,68 +1,108 @@
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.activitytype.stdout.StdoutActivity;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class KafkaProducerActivity extends StdoutActivity {
public class KafkaProducerActivity extends SimpleActivity {
private final static Logger logger = LoggerFactory.getLogger(KafkaProducerActivity.class);
private Producer<Long,String> producer = null;
private String topic;
private String yamlLoc;
private String clientId;
private String servers;
private OpSequence<KafkaStatement> opSequence;
private String schemaRegistryUrl;
Timer resultTimer;
Timer resultSuccessTimer;
public KafkaProducerActivity(ActivityDef activityDef) {
super(activityDef);
}
public synchronized Producer<Long,String> getKafkaProducer() {
if (producer!=null) {
return producer;
}
Properties props = new Properties();
String servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
.orElse("localhost" + ":9092")
.split(","))
.map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
.collect(Collectors.joining(","));
String clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
.orElse("TestProducerClientId");
String key_serializer =
activityDef.getParams().getOptionalString("key_serializer").orElse(LongSerializer.class.getName());
String value_serializer =
activityDef.getParams().getOptionalString("value_serializer").orElse(StringSerializer.class.getName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, key_serializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value_serializer);
producer = new KafkaProducer<>(props);
return producer;
}
@Override
public synchronized void write(String statement) {
Producer<Long, String> kafkaProducer = getKafkaProducer();
ProducerRecord<Long, String> record = new ProducerRecord<>(topic, statement);
Future<RecordMetadata> send = kafkaProducer.send(record);
try {
RecordMetadata result = send.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
this.topic = activityDef.getParams().getOptionalString("topic").orElse("default-topic");
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
// sanity check
yamlLoc = activityDef.getParams().getOptionalString("yaml", "workload")
.orElseThrow(() -> new IllegalArgumentException("yaml is not defined"));
servers = Arrays.stream(activityDef.getParams().getOptionalString("host","hosts")
.orElse("localhost" + ":9092")
.split(","))
.map(x -> x.indexOf(':') == -1 ? x + ":9092" : x)
.collect(Collectors.joining(","));
clientId = activityDef.getParams().getOptionalString("clientid","client.id","client_id")
.orElse("TestProducerClientId");
schemaRegistryUrl = activityDef.getParams()
.getOptionalString("schema_registry_url", "schema.registry.url")
.orElse("http://localhost:8081");
}
@Override
public void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
onActivityDefUpdate(activityDef);
opSequence = initOpSequencer();
setDefaultsFromOpSequence(opSequence);
resultTimer = ActivityMetrics.timer(activityDef, "result");
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
}
private OpSequence<KafkaStatement> initOpSequencer() {
SequencerType sequencerType = SequencerType.valueOf(
getParams().getOptionalString("seq").orElse("bucket")
);
SequencePlanner<KafkaStatement> sequencer = new SequencePlanner<>(sequencerType);
String tagFilter = activityDef.getParams().getOptionalString("tags").orElse("");
StmtsDocList stmtsDocList = StatementsLoader.loadPath(logger, yamlLoc, new StrInterpolator(activityDef),
"activities");
List<OpTemplate> statements = stmtsDocList.getStmts(tagFilter);
String format = getParams().getOptionalString("format").orElse(null);
if (statements.size() > 0) {
for (OpTemplate statement : statements) {
sequencer.addOp(
new KafkaStatement(statement,
servers,
clientId,
schemaRegistryUrl),
statement.getParamOrDefault("ratio", 1)
);
}
} else {
logger.error("Unable to create a Kafka statement if you have no active statements.");
}
return sequencer.resolve();
}
protected OpSequence<KafkaStatement> getOpSequencer() {
return opSequence;
}
}

View File

@ -1,7 +1,5 @@
package com.datastax.ebdrivers.kafkaproducer;
import io.nosqlbench.activitytype.stdout.StdoutAction;
import io.nosqlbench.activitytype.stdout.StdoutActivity;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
@ -21,15 +19,15 @@ public class KafkaProducerActivityType implements ActivityType<KafkaProducerActi
}
private static class Dispenser implements ActionDispenser {
private StdoutActivity activity;
private KafkaProducerActivity activity;
private Dispenser(StdoutActivity activity) {
private Dispenser(KafkaProducerActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new StdoutAction(slot,this.activity);
return new KafkaAction(this.activity, slot);
}
}

View File

@ -0,0 +1,141 @@
package com.datastax.ebdrivers.kafkaproducer;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import io.nosqlbench.virtdata.core.templates.StringBindingsTemplate;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaStatement {
private final static Logger logger = LoggerFactory.getLogger(KafkaStatement.class);
private Producer<Object,Object> producer = null;
private final StringBindings bindings;
private final String topic;
private final String keySerializerClass;
private final String valueSerializerClass;
private AvroSchema keySerializerSchema = null;
private AvroSchema valueSerializerSchema = null;
private final String key;
public KafkaStatement(OpTemplate stmtDef, String servers, String clientId, String schemaRegistryUrl) {
ParsedTemplate paramTemplate = new ParsedTemplate(stmtDef.getStmt(), stmtDef.getBindings());
BindingsTemplate paramBindings = new BindingsTemplate(paramTemplate.getBindPoints());
StringBindingsTemplate template = new StringBindingsTemplate(stmtDef.getStmt(), paramBindings);
this.bindings = template.resolve();
// Process key serializer class and schema, if any
this.keySerializerClass =
stmtDef.getOptionalStringParam("key_serializer_class")
.orElse(StringSerializer.class.getName());
Optional<String> keySerializerSchemaFile =
stmtDef.getOptionalStringParam("key_serializer_schema_file");
if (keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
&& keySerializerSchemaFile.isEmpty() ) {
throw new RuntimeException("KafkaAvroSerializer requires key_serializer_schema_file");
}
if (keySerializerSchemaFile.isPresent()) {
Path schemaFilePath = Path.of(keySerializerSchemaFile.get());
try {
this.keySerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
} catch (IOException e) {
throw new RuntimeException("Error reading key schema file: " + keySerializerSchemaFile, e);
}
}
// Process value serializer class and schema, if any
this.valueSerializerClass =
stmtDef.getOptionalStringParam("value_serializer_class")
.orElse(StringSerializer.class.getName());
Optional<String> valueSerializerSchemaFile =
stmtDef.getOptionalStringParam("value_serializer_schema_file");
if (valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")
&& valueSerializerSchemaFile.isEmpty() ) {
throw new RuntimeException("KafkaAvroSerializer requires value_serializer_schema_file");
}
if (valueSerializerSchemaFile.isPresent()) {
Path schemaFilePath = Path.of(valueSerializerSchemaFile.get());
try {
this.valueSerializerSchema = new AvroSchema(Files.readString(schemaFilePath));
} catch (IOException e) {
throw new RuntimeException("Error reading value schema file: " + valueSerializerSchemaFile, e);
}
}
this.topic = stmtDef.getParamOrDefault("topic","default-topic");
this.key = stmtDef.getOptionalStringParam("key").orElse("key");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
props.put("schema.registry.url", schemaRegistryUrl);
try {
producer = new KafkaProducer<>(props);
} catch (Exception e) {
logger.error("Error constructing kafka producer", e);
}
}
private Object bindKey(long cycle) {
Object statement = key;
if (keySerializerClass != null &&
keySerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
try {
statement = AvroSchemaUtils.toObject((String)statement, keySerializerSchema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return statement;
}
private Object bindValue(long cycle) {
Object statement = bindings.bind(cycle);
if (valueSerializerClass != null &&
valueSerializerClass.equals("io.confluent.kafka.serializers.KafkaAvroSerializer")) {
try {
statement = AvroSchemaUtils.toObject((String)statement, valueSerializerSchema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return statement;
}
public void write(long cycle) {
Object key = bindKey(cycle);
Object value = bindValue(cycle);
try {
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> send = producer.send(record);
RecordMetadata result = send.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}