add initial implementation of pulsar ops

This commit is contained in:
Jonathan Shook 2021-02-01 23:42:08 -06:00
parent 0330c18a7b
commit 70709d3d9f
10 changed files with 367 additions and 83 deletions

View File

@ -37,13 +37,12 @@ public class PulsarAction implements SyncAction {
);
}
int tries = 0;
int maxTries = 1;
while (tries < maxTries) {
tries++;
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
}
// TODO: add retries and use standard error handler
return 0;
}
}

View File

@ -8,7 +8,6 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
@ -21,20 +20,15 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer bindTimer;
public Timer executeTimer;
public enum PulsarClientScope {
activity,
thread
}
private PulsarSpaceCache pulsarCache;
private NBErrorHandler errorhandler;
private String pulsarUrl;
private OpSequence<ReadyPulsarOp> sequencer;
private PulsarClientScope clientScope = PulsarClientScope.activity;
private PulsarClient activityClient;
private Supplier<PulsarClient> clientSupplier;
private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
public PulsarActivity(ActivityDef activityDef) {
super(activityDef);
@ -48,12 +42,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
executeTimer = ActivityMetrics.timer(activityDef, "execute");
pulsarUrl = activityDef.getParams().getOptionalString("url").orElse("pulsar://localhost:6650");
ScopedSupplier clientScope = ScopedSupplier.valueOf(getParams().getOptionalString("client_scope").orElse("singleton"));
this.clientSupplier = clientScope.supplier(this::newClient);
PulsarClient pulsarClient = this.clientSupplier.get();
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, this.clientSupplier));
pulsarCache = new PulsarSpaceCache(this, this::newClient);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache));
setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef);
@ -66,24 +56,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
this.clientScope = PulsarClientScope.valueOf(activityDef.getParams().getOptionalString("scope").orElse("activity"));
}
// public synchronized Function<Thread, PulsarClient> getClient() {
// switch (getClientScope()) {
// case thread:
// return t -> newClient();
// case activity:
// if (this.activityClient == null) {
// this.activityClient = newClient();
// }
// return t -> this.activityClient;
// default:
// throw new RuntimeException("unable to recognize client scope: " + getClientScope());
// }
//
// }
public PulsarClient newClient() {
try {
PulsarClient newClient = PulsarClient.builder().
@ -95,10 +69,6 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
}
}
private PulsarClientScope getClientScope() {
return clientScope;
}
public OpSequence<ReadyPulsarOp> getSequencer() {
return sequencer;
}
@ -106,4 +76,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer getBindTimer() {
return bindTimer;
}
public Timer getExecuteTimer() {
return this.executeTimer;
}
}

View File

@ -3,7 +3,10 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class)
public class PulsarActivityType implements ActivityType<PulsarActivity> {
@Override
@ -14,11 +17,16 @@ public class PulsarActivityType implements ActivityType<PulsarActivity> {
@Override
public ActionDispenser getActionDispenser(PulsarActivity activity) {
if (activity.getParams().getOptionalString("async").isPresent()) {
throw new RuntimeException("The async pulsar driver is not online yet.");
throw new RuntimeException("The async pulsar driver is not implemented yet.");
}
return new PulsarActionDispenser(activity);
}
@Override
public PulsarActivity getActivity(ActivityDef activityDef) {
return new PulsarActivity(activityDef);
}
private static class PulsarActionDispenser implements ActionDispenser {
private final PulsarActivity activity;

View File

@ -0,0 +1,64 @@
package io.nosqlbench.driver.pulsar;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* An instance of a pulsar client, along with all the cached objects which are normally
* associated with it during a client session in a typical application.
* A PulsarSpace is simply a named and cached set of objects which must be used together.
*/
public class PulsarSpace {
private final String name;
private final Supplier<PulsarClient> clientFunc;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private PulsarClient _client;
public PulsarSpace(String name, Supplier<PulsarClient> clientFunc) {
this.name = name;
this.clientFunc = clientFunc;
}
public PulsarClient getClient() {
if (_client == null) {
_client = clientFunc.get();
}
return _client;
}
public Producer<?> getProducer(String pname, String topicName) {
Producer<?> producer = producers.computeIfAbsent(
pname, n -> {
try {
// TODO: parameterize producer settings
return getClient().newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
);
return producer;
}
public Consumer<?> getConsumer(String pname, String topicName) {
Consumer<?> consumer = consumers.computeIfAbsent(
pname, n -> {
try {
// TODO: parameterize subscription name and other settings
return getClient().newConsumer().topic(topicName).subscriptionName("testsub").subscribe();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
);
return consumer;
}
}

View File

@ -0,0 +1,38 @@
package io.nosqlbench.driver.pulsar;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* To enable flexibility in testing methods, each object graph which is used within
* the pulsar API is kept within a single umbrella called the PulsarSpace.
* This allows for clients, producers, and consumers to remain connected and
* cached in a useful way.
*/
public class PulsarSpaceCache {
// TODO: Implement cache limits
// TODO: Implement variant cache eviction behaviors (halt, warn, LRU)
private final PulsarActivity activity;
private final Supplier<PulsarClient> clientFunc;
private final ConcurrentHashMap<String, PulsarSpace> clientScopes = new ConcurrentHashMap<>();
public PulsarSpaceCache(PulsarActivity pulsarActivity, Supplier<PulsarClient> newClient) {
this.activity = pulsarActivity;
this.clientFunc = newClient;
}
public PulsarSpace getClientSpace(String name) {
PulsarSpace cspace = clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, clientFunc));
return cspace;
}
public PulsarActivity getActivity() {
return activity;
}
}

View File

@ -0,0 +1,36 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarRecvMapper implements LongFunction<PulsarOp> {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<String> recvInstructions;
private final CommandTemplate cmdTpl;
public PulsarRecvMapper(LongFunction<Consumer<?>> consumerFunc,
LongFunction<String> recvMsg,
CommandTemplate cmdTpl) {
this.consumerFunc = consumerFunc;
this.recvInstructions = recvMsg;
this.cmdTpl = cmdTpl;
// TODO add schema support
}
@Override
public PulsarOp apply(long value) {
return new PulsarRecvOp((Consumer<byte[]>) consumerFunc.apply(value), recvInstructions.apply(value));
}
}

View File

@ -0,0 +1,29 @@
package io.nosqlbench.driver.pulsar.ops;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;
public class PulsarRecvOp implements PulsarOp {
private final Consumer<byte[]> consumer;
private final String recvInstructions;
public PulsarRecvOp(Consumer<byte[]> consumer, String recvInstructions) {
this.consumer = consumer;
this.recvInstructions = recvInstructions;
}
@Override
public void run() {
try {
Message<byte[]> msgbytes = consumer.receive();
// TODO: Parameterize the actions taken on a received message
// TODO: Properly parameterize all pulsar Op types as with Producer<T> and Consumer<T>
System.out.println("received:" + new String(msgbytes.getValue(), StandardCharsets.UTF_8));
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,39 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarSendMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<String> payloadFunc;
public PulsarSendMapper(
LongFunction<Producer<?>> producerFunc,
LongFunction<String> msgFunc,
CommandTemplate cmdTpl) {
this.producerFunc = producerFunc;
this.payloadFunc = msgFunc;
this.cmdTpl = cmdTpl;
// TODO: add schema support
}
@Override
public PulsarOp apply(long value) {
Producer<?> producer = producerFunc.apply(value);
String msg = payloadFunc.apply(value);
return new PulsarSendOp((Producer<byte[]>) producer, msg);
}
}

View File

@ -0,0 +1,26 @@
package io.nosqlbench.driver.pulsar.ops;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;
public class PulsarSendOp implements PulsarOp {
private final Producer<byte[]> producer;
private final String msg;
public PulsarSendOp(Producer<byte[]> producer, String msg) {
this.producer = producer;
this.msg = msg;
}
@Override
public void run() {
try {
MessageId mid = producer.send(msg.getBytes(StandardCharsets.UTF_8));
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,13 +1,13 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.PulsarSpaceCache;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@ -15,66 +15,137 @@ public class ReadyPulsarOp {
private final CommandTemplate cmdTpl;
private final LongFunction<PulsarOp> opFunc;
private final Supplier<PulsarClient> clientFunc;
private final PulsarSpaceCache pcache;
public ReadyPulsarOp(OpTemplate opTemplate, Supplier<PulsarClient> clientFunc) {
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) {
this.cmdTpl = new CommandTemplate(opTemplate);
this.clientFunc = clientFunc;
this.pcache = pcache;
if (cmdTpl.isDynamic("op_scope")) {
throw new RuntimeException("op_scope must be static");
}
this.opFunc = resolve();
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
// thread local op construction or not
// this allows deferment of client construction via lambda capture
if (true) {
tlOpFunction = ThreadLocal.withInitial(this::resolve);
} else {
tlOpFunction = ThreadLocal.withInitial(() -> opFunc);
}
}
private LongFunction<PulsarOp> resolve() {
if (cmdTpl.containsKey("produce")) {
} else if (cmdTpl.containsKey("consume")) {
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
tlOpFunction = ThreadLocal.withInitial()
if (cmdTpl.isStatic("type"))
if (cmdTpl.isStatic("produce") || cmdTpl.isDynamic("produce")) {
}
// Create a pulsarOp which can be executed.
// The
public PulsarOp bind ( long value){
PulsarClient client = clientFunc.apply(Thread.currentThread());
try {
Producer<byte[]> producer = client.newProducer().topic("topic").create();
} catch (PulsarClientException e) {
e.printStackTrace();
}
Map<String, String> cmd = cmdTpl.getCommand(value);
return (Void) -> new PulsarOp() {
};
if (cmdTpl.containsKey("send") && cmdTpl.containsKey("recv")) {
throw new RuntimeException("You must specify either send or recv, but not both.");
}
if (!cmdTpl.containsKey("send") && !cmdTpl.containsKey("recv")) {
throw new RuntimeException("You must specify either send or recv, but none was provided.");
}
LongFunction<String> topic_uri_func;
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
String topic_uri = cmdTpl.getStatic("topic_uri");
topic_uri_func = (l) -> topic_uri;
} else {
topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (
!cmdTpl.isDynamic("persistence") // optimize topic around static piece-wise values
&& !cmdTpl.isDynamic("tenant")
&& !cmdTpl.isDynamic("namespace")
&& !cmdTpl.isDynamic("topic")
) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "default");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topic_uri_func = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topic_uri_func = (l) ->
cmdTpl.getDynamicOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getDynamicOr("tenant", l, "public")
+ "/" + cmdTpl.getDynamicOr("namespace", l, "default")
+ "/" + cmdTpl.getDynamicOr("topic", l, "default");
}
LongFunction<PulsarSpace> spaceFunc;
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isStatic("client")) {
String client_name = cmdTpl.getStatic("client");
PulsarSpace clientSpace = pcache.getClientSpace(client_name);
spaceFunc = l -> clientSpace;
} else {
spaceFunc = l -> pcache.getClientSpace(cmdTpl.getDynamic("client", l));
}
} else {
spaceFunc = l -> pcache.getClientSpace("default");
}
if (cmdTpl.containsKey("send")) {
return resolveSend(spaceFunc, cmdTpl, topic_uri_func);
} else if (cmdTpl.containsKey("recv")) {
return resolveRecv(spaceFunc, cmdTpl, topic_uri_func);
} else {
throw new RuntimeException("Neither send nor recv were found in the op template.");
}
}
private LongFunction<PulsarOp> resolveRecv(
LongFunction<PulsarSpace> spaceFunc,
CommandTemplate cmdTpl,
LongFunction<String> topic_uri_func) {
LongFunction<Consumer<?>> consumerFunc;
if (cmdTpl.isStatic("consumer")) {
String consumerName = cmdTpl.getStatic("consumer");
consumerFunc = (l) -> spaceFunc.apply(l).getConsumer(consumerName, topic_uri_func.apply(l));
} else if (cmdTpl.isDynamic("consumer")) {
consumerFunc = (l) -> spaceFunc.apply(l)
.getConsumer(cmdTpl.getDynamic("consumer", l), topic_uri_func.apply(l));
} else {
consumerFunc = (l) -> spaceFunc.apply(l)
.getConsumer(topic_uri_func.apply(l), topic_uri_func.apply(l));
}
return new PulsarRecvMapper(consumerFunc, (l) -> cmdTpl.get("recv", l), cmdTpl);
}
private LongFunction<PulsarOp> resolveSend(
LongFunction<PulsarSpace> spaceFunc,
CommandTemplate cmdTpl,
LongFunction<String> topic_uri_func
) {
LongFunction<Producer<?>> producerFunc;
if (cmdTpl.isStatic("producer")) {
String producerName = cmdTpl.getStatic("producer");
producerFunc = (l) -> spaceFunc.apply(l).getProducer(producerName, topic_uri_func.apply(l));
} else if (cmdTpl.isDynamic("producer")) {
producerFunc = (l) -> spaceFunc.apply(l)
.getProducer(cmdTpl.getDynamic("producer", l), topic_uri_func.apply(l));
} else {
producerFunc = (l) -> spaceFunc.apply(l)
.getProducer(topic_uri_func.apply(l), topic_uri_func.apply(l));
}
return new PulsarSendMapper(producerFunc, (l) -> cmdTpl.get("send", l), cmdTpl);
}
// Create a pulsarOp which can be executed.
// The
public PulsarOp bind(long value) {
return opFunc.apply(value);
PulsarOp op = opFunc.apply(value);
return op;
}
}