Add Pulsar JMS driver

This commit is contained in:
yabinmeng-gitee 2021-04-30 17:33:57 -05:00
parent 6c9b79993f
commit f75ccc1b09
3 changed files with 21 additions and 30 deletions

View File

@ -16,24 +16,20 @@ public class PulsarJmsAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class);
private final ActivityDef activityDef;
private final PulsarJmsActivity activity;
private final int slot;
private final PulsarJmsActivity activity;
private OpSequence<OpDispenser<PulsarJmsOp>> sequencer;
int maxTries;
int maxTries = 1;
public PulsarJmsAction(ActivityDef activityDef, int slot, PulsarJmsActivity activity) {
this.activityDef = activityDef;
this.slot = slot;
public PulsarJmsAction(PulsarJmsActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
}
@Override
public void init() {
this.sequencer = activity.getSequencer();
}
@Override
@ -45,7 +41,7 @@ public class PulsarJmsAction implements SyncAction {
PulsarJmsOp pulsarJmsOp;
try (Timer.Context ctx = activity.getBindTimer().time()) {
LongFunction<PulsarJmsOp> readyPulsarJmsOp = sequencer.get(cycle);
LongFunction<PulsarJmsOp> readyPulsarJmsOp = activity.getSequencer().get(cycle);
pulsarJmsOp = readyPulsarJmsOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...

View File

@ -1,17 +0,0 @@
package io.nosqlbench.driver.pularjms;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
public class PulsarJmsActionDispenser implements ActionDispenser {
private final PulsarJmsActivity activity;
public PulsarJmsActionDispenser(PulsarJmsActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new PulsarJmsAction(activity.getActivityDef(),slot,activity);
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pularjms;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
@ -7,14 +8,25 @@ import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class, selector = "pulsar_jms")
public class PulsarJmsActivityType implements ActivityType<PulsarJmsActivity> {
@Override
public ActionDispenser getActionDispenser(PulsarJmsActivity activity) {
return new PulsarJmsActionDispenser(activity);
}
@Override
public PulsarJmsActivity getActivity(ActivityDef activityDef) {
return new PulsarJmsActivity(activityDef);
}
@Override
public ActionDispenser getActionDispenser(PulsarJmsActivity activity) {
return new PulsarJmsActionDispenser(activity);
private static class PulsarJmsActionDispenser implements ActionDispenser {
private final PulsarJmsActivity activity;
public PulsarJmsActionDispenser(PulsarJmsActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new PulsarJmsAction(activity, slot);
}
}
}