update pulsar driver

This commit is contained in:
Jonathan Shook 2021-02-04 17:47:07 -06:00
parent 4d87e1c9a7
commit 516aee2ef5
11 changed files with 86 additions and 25 deletions

View File

@ -2,11 +2,12 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class PulsarAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(PulsarAction.class);
@ -28,8 +29,8 @@ public class PulsarAction implements SyncAction {
PulsarOp pulsarOp;
try (Timer.Context ctx = activity.getBindTimer().time()) {
ReadyPulsarOp readyPulsarOp = activity.getSequencer().get(cycle);
pulsarOp = readyPulsarOp.bind(cycle);
LongFunction<PulsarOp> readyPulsarOp = activity.getSequencer().get(cycle);
pulsarOp = readyPulsarOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...
throw new RuntimeException(

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
@ -12,6 +13,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -24,7 +26,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
private NBErrorHandler errorhandler;
private String pulsarUrl;
private OpSequence<ReadyPulsarOp> sequencer;
private OpSequence<LongFunction<PulsarOp>> sequencer;
private PulsarClient activityClient;
private Supplier<PulsarSpace> clientSupplier;
@ -69,7 +71,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
}
}
public OpSequence<ReadyPulsarOp> getSequencer() {
public OpSequence<LongFunction<PulsarOp>> getSequencer() {
return sequencer;
}

View File

@ -6,7 +6,7 @@ import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class)
@Service(value = ActivityType.class, selector = "pulsar")
public class PulsarActivityType implements ActivityType<PulsarActivity> {
@Override
@ -30,7 +30,6 @@ public class PulsarActivityType implements ActivityType<PulsarActivity> {
private static class PulsarActionDispenser implements ActionDispenser {
private final PulsarActivity activity;
public PulsarActionDispenser(PulsarActivity activity) {
this.activity = activity;
}

View File

@ -25,7 +25,7 @@ public class PulsarSpaceCache {
this.clientFunc = newClient;
}
public PulsarSpace getClientSpace(String name) {
public PulsarSpace getPulsarSpace(String name) {
PulsarSpace cspace = clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, clientFunc));
return cspace;
}

View File

@ -5,6 +5,4 @@ package io.nosqlbench.driver.pulsar.ops;
*/
public interface PulsarOp extends Runnable {
default void run() {
}
}

View File

@ -19,13 +19,16 @@ public class PulsarSendMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<String> payloadFunc;
private final LongFunction<String> keyFunc;
public PulsarSendMapper(
LongFunction<Producer<?>> producerFunc,
LongFunction<String> msgFunc,
LongFunction<String> keyFunc,
CommandTemplate cmdTpl) {
this.producerFunc = producerFunc;
this.payloadFunc = msgFunc;
this.keyFunc = keyFunc;
this.cmdTpl = cmdTpl;
// TODO: add schema support
}
@ -34,6 +37,7 @@ public class PulsarSendMapper implements LongFunction<PulsarOp> {
public PulsarOp apply(long value) {
Producer<?> producer = producerFunc.apply(value);
String msg = payloadFunc.apply(value);
return new PulsarSendOp((Producer<byte[]>) producer, msg);
String key = keyFunc != null ? keyFunc.apply(value) : null;
return new PulsarSendOp(key, (Producer<byte[]>) producer, msg);
}
}

View File

@ -9,12 +9,15 @@ import java.nio.charset.StandardCharsets;
public class PulsarSendOp implements PulsarOp {
private final Producer<byte[]> producer;
private final String msg;
private final String key;
public PulsarSendOp(Producer<byte[]> producer, String msg) {
public PulsarSendOp(String key, Producer<byte[]> producer, String msg) {
this.producer = producer;
this.msg = msg;
this.key = key;
}
// TODO: figure out how to add a key when it is non-null
@Override
public void run() {
try {

View File

@ -11,12 +11,14 @@ import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class ReadyPulsarOp {
public class ReadyPulsarOp implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final LongFunction<PulsarOp> opFunc;
private final PulsarSpaceCache pcache;
// TODO: Add docs for the command template with respect to the OpTemplate
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) {
this.cmdTpl = new CommandTemplate(opTemplate);
this.pcache = pcache;
@ -27,7 +29,6 @@ public class ReadyPulsarOp {
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
}
private LongFunction<PulsarOp> resolve() {
@ -79,15 +80,16 @@ public class ReadyPulsarOp {
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isStatic("client")) {
String client_name = cmdTpl.getStatic("client");
PulsarSpace clientSpace = pcache.getClientSpace(client_name);
PulsarSpace clientSpace = pcache.getPulsarSpace(client_name);
spaceFunc = l -> clientSpace;
} else {
spaceFunc = l -> pcache.getClientSpace(cmdTpl.getDynamic("client", l));
spaceFunc = l -> pcache.getPulsarSpace(cmdTpl.getDynamic("client", l));
}
} else {
spaceFunc = l -> pcache.getClientSpace("default");
spaceFunc = l -> pcache.getPulsarSpace("default");
}
// TODO: Add batch operation types to pulsar
if (cmdTpl.containsKey("send")) {
return resolveSend(spaceFunc, cmdTpl, topic_uri_func);
} else if (cmdTpl.containsKey("recv")) {
@ -129,6 +131,7 @@ public class ReadyPulsarOp {
if (cmdTpl.isStatic("producer")) {
String producerName = cmdTpl.getStatic("producer");
producerFunc = (l) -> spaceFunc.apply(l).getProducer(producerName, topic_uri_func.apply(l));
} else if (cmdTpl.isDynamic("producer")) {
producerFunc = (l) -> spaceFunc.apply(l)
.getProducer(cmdTpl.getDynamic("producer", l), topic_uri_func.apply(l));
@ -137,15 +140,22 @@ public class ReadyPulsarOp {
.getProducer(topic_uri_func.apply(l), topic_uri_func.apply(l));
}
return new PulsarSendMapper(producerFunc, (l) -> cmdTpl.get("send", l), cmdTpl);
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("key")) {
String keyName = cmdTpl.getStatic("key");
keyFunc = (l) -> keyName;
} else if (cmdTpl.isDynamic("key")) {
keyFunc = (l) -> cmdTpl.getDynamic("key", l);
} else {
keyFunc = null;
}
return new PulsarSendMapper(producerFunc, (l) -> cmdTpl.get("send", l), keyFunc, cmdTpl);
}
// Create a pulsarOp which can be executed.
// The
public PulsarOp bind(long value) {
@Override
public PulsarOp apply(long value) {
PulsarOp op = opFunc.apply(value);
return op;
}
}

View File

@ -3,16 +3,24 @@
This driver allows you to produce and consume Apache Pulsar messages with
NoSQLBench.
## Issues Tracker
If you have issues or new requirements for this driver, please add them at
the
[pulsar issues tracker](
https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
## Example Statements
The simplest pulsar statement looks like this:
```yaml
statement: send='{this is a test message}'
statement: send='{"msg":"test message"}'
```
In this example, the statement is sent by a producer with a default
_topic_uri_ of `persistent://public/default/default`.
_topic_uri_ of `persistent://public/default/default` at at the pulsar
endpoint `pulsar://localhost:6650`
A complete example which uses all the available fields:
@ -71,6 +79,32 @@ runtime. This is enabled directly by using nominative variables for
instance names where needed. When the instance names are not provided for
an operation, defaults are used to emulate a simple configuration.
Since this is a new capability in a NoSQLBench driver, how it works is
explained below:
When a pulsar cycles is executed, the operation is synthesized from the op
template fields as explained below under _Op Fields_. This happens in a
specific order:
1. The client instance name is resolved. If a `client` field is provided,
this is taken as the client instance name. If not, it is set
to `default`.
2. The named client instance is fetched from the cache, or created and
cached if it does not yet exist.
3. The topic_uri is resolved. This is the value to be used with
`.topic(...)` calls in the API. The op fields below explain how to
control this value.
4. For _send_ operations, a producer is named and created if needed. By
default, the producer is named after the topic_uri above. You can
override this by providing a value for `producer`.
5. For _recv_ operations, a consumer is named and created if needed. By
default, the consumer is named after the topic_uri above. You can
override this by providing a value for `consumer`.
The most important detail for understanding the instancing controls is
that clients, producers, and consumers are all named and cached in the
specific order above.
## Op Fields
Thees fields are used to define of a single pulsar client operation. These
@ -138,6 +172,7 @@ for Apache Pulsar such as clients, producers, and consumers.
## Activity Parameters
- **url** - The pulsar url to connect to.
- **default** - `url=pulsar://localhost:6650`
- **maxcached** - A default value to be applied to `max_clients`,
`max_producers`, `max_consumers`.
- default: `max_cached=100`

View File

@ -0,0 +1,3 @@
# pulsar help topics
- pulsar

View File

@ -125,6 +125,12 @@
<version>4.15.12-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-pulsar</artifactId>
<version>4.15.12-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.nosqlbench</groupId>-->
<!-- <artifactId>nb-runtime</artifactId>-->