mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Address merge conflict
This commit is contained in:
commit
e77635437c
@ -15,6 +15,12 @@ scenarios:
|
||||
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
|
||||
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
|
||||
bindings:
|
||||
# To enable an optional weighted set of hosts in place of a load balancer
|
||||
# Examples
|
||||
# single host: stargate_host=host1
|
||||
# multiple hosts: stargate_host=host1,host2,host3
|
||||
# multiple weighted hosts: stargate_host=host1:3,host2:7
|
||||
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
|
||||
# http request id
|
||||
request_id: ToHashedUUID(); ToString();
|
||||
|
||||
@ -62,7 +68,7 @@ blocks:
|
||||
tags:
|
||||
phase: rampup
|
||||
statements:
|
||||
- rampup-insert: POST http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:iot>>
|
||||
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:iot>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
@ -85,7 +91,7 @@ blocks:
|
||||
params:
|
||||
ratio: <<read_ratio:1>>
|
||||
statements:
|
||||
- main-select: GET http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:iot>>?where=E[[{"machine_id":{"$eq":"{machine_id}"},"sensor_name":{"$eq":"{sensor_name}"}}]]&page-size=<<limit:10>>
|
||||
- main-select: GET <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:iot>>?where=E[[{"machine_id":{"$eq":"{machine_id}"},"sensor_name":{"$eq":"{sensor_name}"}}]]&page-size=<<limit:10>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
@ -99,7 +105,7 @@ blocks:
|
||||
params:
|
||||
ratio: <<write_ratio:9>>
|
||||
statements:
|
||||
- main-write: POST http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:iot>>
|
||||
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:iot>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
|
@ -11,6 +11,12 @@ scenarios:
|
||||
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
|
||||
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
|
||||
bindings:
|
||||
# To enable an optional weighted set of hosts in place of a load balancer
|
||||
# Examples
|
||||
# single host: stargate_host=host1
|
||||
# multiple hosts: stargate_host=host1,host2,host3
|
||||
# multiple weighted hosts: stargate_host=host1:3,host2:7
|
||||
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
|
||||
# http request id
|
||||
request_id: ToHashedUUID(); ToString();
|
||||
|
||||
@ -44,7 +50,7 @@ blocks:
|
||||
tags:
|
||||
phase: rampup
|
||||
statements:
|
||||
- rampup-insert: POST http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:keyvalue>>
|
||||
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:keyvalue>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
@ -63,7 +69,7 @@ blocks:
|
||||
params:
|
||||
ratio: 5
|
||||
statements:
|
||||
- main-select: GET http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:keyvalue>>/{rw_key}
|
||||
- main-select: GET <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:keyvalue>>/{rw_key}
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
@ -77,7 +83,7 @@ blocks:
|
||||
params:
|
||||
ratio: 5
|
||||
statements:
|
||||
- main-write: POST http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:keyvalue>>
|
||||
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:keyvalue>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
|
@ -12,6 +12,12 @@ scenarios:
|
||||
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
|
||||
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
|
||||
bindings:
|
||||
# To enable an optional weighted set of hosts in place of a load balancer
|
||||
# Examples
|
||||
# single host: stargate_host=host1
|
||||
# multiple hosts: stargate_host=host1,host2,host3
|
||||
# multiple weighted hosts: stargate_host=host1:3,host2:7
|
||||
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
|
||||
# http request id
|
||||
request_id: ToHashedUUID(); ToString();
|
||||
# for ramp-up and verify
|
||||
@ -53,7 +59,7 @@ blocks:
|
||||
tags:
|
||||
phase: rampup
|
||||
statements:
|
||||
- rampup-insert: POST http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:tabular>>
|
||||
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:tabular>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
@ -73,7 +79,7 @@ blocks:
|
||||
params:
|
||||
ratio: 5
|
||||
statements:
|
||||
- main-select: GET http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:tabular>>/{part_read}&page-size={limit}
|
||||
- main-select: GET <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:tabular>>/{part_read}&page-size={limit}
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
@ -87,7 +93,7 @@ blocks:
|
||||
params:
|
||||
ratio: 5
|
||||
statements:
|
||||
- main-write: POST http://<<stargate_host:stargate>>:<<stargate_port:8082>>/v2/keyspaces/<<keyspace:baselines>>/<<table:tabular>>
|
||||
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8082>><<path_prefix:>>/v2/keyspaces/<<keyspace:baselines>>/<<table:tabular>>
|
||||
Accept: "application/json"
|
||||
X-Cassandra-Request-Id: "{request_id}"
|
||||
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
|
||||
|
@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -14,10 +15,12 @@ public class PulsarAction implements SyncAction {
|
||||
|
||||
private final int slot;
|
||||
private final PulsarActivity activity;
|
||||
int maxTries = 1;
|
||||
|
||||
public PulsarAction(PulsarActivity activity, int slot) {
|
||||
this.activity = activity;
|
||||
this.slot = slot;
|
||||
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -26,6 +29,7 @@ public class PulsarAction implements SyncAction {
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycle) {
|
||||
long start = System.nanoTime();
|
||||
|
||||
PulsarOp pulsarOp;
|
||||
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||
@ -33,17 +37,26 @@ public class PulsarAction implements SyncAction {
|
||||
pulsarOp = readyPulsarOp.apply(cycle);
|
||||
} catch (Exception bindException) {
|
||||
// if diagnostic mode ...
|
||||
activity.getErrorhandler().handleError(bindException, cycle, 0);
|
||||
throw new RuntimeException(
|
||||
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
|
||||
);
|
||||
}
|
||||
|
||||
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
|
||||
pulsarOp.run();
|
||||
for (int i = 0; i < maxTries; i++) {
|
||||
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
|
||||
pulsarOp.run();
|
||||
break;
|
||||
} catch (RuntimeException err) {
|
||||
ErrorDetail errorDetail = activity
|
||||
.getErrorhandler()
|
||||
.handleError(err, cycle, System.nanoTime() - start);
|
||||
if (!errorDetail.isRetryable()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: add retries and use standard error handler
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
|
||||
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
|
||||
@ -26,6 +28,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
|
||||
public Timer bindTimer;
|
||||
public Timer executeTimer;
|
||||
public Counter bytesCounter;
|
||||
public Histogram messagesizeHistogram;
|
||||
|
||||
private PulsarSpaceCache pulsarCache;
|
||||
private PulsarAdmin pulsarAdmin;
|
||||
|
||||
@ -100,6 +105,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
|
||||
|
||||
String pulsarClntConfFile =
|
||||
activityDef.getParams().getOptionalString("config").orElse("config.properties");
|
||||
@ -114,7 +121,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
|
||||
pulsarCache = new PulsarSpaceCache(this);
|
||||
|
||||
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache));
|
||||
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this));
|
||||
setDefaultsFromOpSequence(sequencer);
|
||||
onActivityDefUpdate(activityDef);
|
||||
|
||||
@ -124,6 +131,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
);
|
||||
}
|
||||
|
||||
public NBErrorHandler getErrorhandler() {
|
||||
return errorhandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
@ -152,4 +163,12 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
public Timer getExecuteTimer() {
|
||||
return this.executeTimer;
|
||||
}
|
||||
|
||||
public Counter getBytesCounter() {
|
||||
return bytesCounter;
|
||||
}
|
||||
|
||||
public Histogram getMessagesizeHistogram() {
|
||||
return messagesizeHistogram;
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,10 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
@ -347,7 +351,8 @@ public class PulsarSpace {
|
||||
PulsarClient pulsarClient = getPulsarClient();
|
||||
|
||||
// Get other possible producer settings that are set at global level
|
||||
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
|
||||
Map<String, Object> consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap());
|
||||
consumerConf.remove("timeout");
|
||||
|
||||
// Explicit topic names will take precedence over topics pattern
|
||||
if (!topicNames.isEmpty()) {
|
||||
|
@ -1,5 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
@ -20,14 +22,20 @@ import java.util.function.LongFunction;
|
||||
public class PulsarConsumerMapper extends PulsarOpMapper {
|
||||
private final LongFunction<Consumer<?>> consumerFunc;
|
||||
private final LongFunction<Boolean> asyncApiFunc;
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
|
||||
public PulsarConsumerMapper(CommandTemplate cmdTpl,
|
||||
PulsarSpace clientSpace,
|
||||
LongFunction<Consumer<?>> consumerFunc,
|
||||
LongFunction<Boolean> asyncApiFunc) {
|
||||
LongFunction<Boolean> asyncApiFunc,
|
||||
Counter bytesCounter,
|
||||
Histogram messagesizeHistogram) {
|
||||
super(cmdTpl, clientSpace);
|
||||
this.consumerFunc = consumerFunc;
|
||||
this.asyncApiFunc = asyncApiFunc;
|
||||
this.bytesCounter = bytesCounter;
|
||||
this.messagesizeHistogram = messagesizeHistogram;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -39,7 +47,9 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
|
||||
consumer,
|
||||
clientSpace.getPulsarSchema(),
|
||||
asyncApi,
|
||||
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds()
|
||||
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
|
||||
bytesCounter,
|
||||
messagesizeHistogram
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -18,12 +20,18 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
private final Schema<?> pulsarSchema;
|
||||
private final boolean asyncPulsarOp;
|
||||
private final int timeoutSeconds;
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
|
||||
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds) {
|
||||
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds,
|
||||
Counter bytesCounter,
|
||||
Histogram messagesizeHistogram) {
|
||||
this.consumer = consumer;
|
||||
this.pulsarSchema = schema;
|
||||
this.asyncPulsarOp = asyncPulsarOp;
|
||||
this.timeoutSeconds = timeoutSeconds;
|
||||
this.bytesCounter = bytesCounter;
|
||||
this.messagesizeHistogram = messagesizeHistogram;
|
||||
}
|
||||
|
||||
public void syncConsume() {
|
||||
@ -54,7 +62,9 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData()));
|
||||
}
|
||||
}
|
||||
|
||||
int messagesize = message.getData().length;
|
||||
bytesCounter.inc(messagesize);
|
||||
messagesizeHistogram.update(messagesize);
|
||||
consumer.acknowledge(message.getMessageId());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -1,5 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
@ -22,18 +24,24 @@ public class PulsarProducerMapper extends PulsarOpMapper {
|
||||
private final LongFunction<Boolean> asyncApiFunc;
|
||||
private final LongFunction<String> keyFunc;
|
||||
private final LongFunction<String> payloadFunc;
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
|
||||
public PulsarProducerMapper(CommandTemplate cmdTpl,
|
||||
PulsarSpace clientSpace,
|
||||
LongFunction<Producer<?>> producerFunc,
|
||||
LongFunction<Boolean> asyncApiFunc,
|
||||
LongFunction<String> keyFunc,
|
||||
LongFunction<String> payloadFunc) {
|
||||
LongFunction<String> payloadFunc,
|
||||
Counter bytesCounter,
|
||||
Histogram messagesizeHistogram) {
|
||||
super(cmdTpl, clientSpace);
|
||||
this.producerFunc = producerFunc;
|
||||
this.asyncApiFunc = asyncApiFunc;
|
||||
this.keyFunc = keyFunc;
|
||||
this.payloadFunc = payloadFunc;
|
||||
this.bytesCounter = bytesCounter;
|
||||
this.messagesizeHistogram = messagesizeHistogram;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -48,6 +56,8 @@ public class PulsarProducerMapper extends PulsarOpMapper {
|
||||
clientSpace.getPulsarSchema(),
|
||||
asyncApi,
|
||||
msgKey,
|
||||
msgPayload);
|
||||
msgPayload,
|
||||
bytesCounter,
|
||||
messagesizeHistogram);
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.PulsarAction;
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -22,17 +23,23 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
private final String msgKey;
|
||||
private final String msgPayload;
|
||||
private final boolean asyncPulsarOp;
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
|
||||
public PulsarProducerOp(Producer<?> producer,
|
||||
Schema<?> schema,
|
||||
boolean asyncPulsarOp,
|
||||
String key,
|
||||
String payload) {
|
||||
String payload,
|
||||
Counter bytesCounter,
|
||||
Histogram messagesizeHistogram) {
|
||||
this.producer = producer;
|
||||
this.pulsarSchema = schema;
|
||||
this.msgKey = key;
|
||||
this.msgPayload = payload;
|
||||
this.asyncPulsarOp = asyncPulsarOp;
|
||||
this.bytesCounter = bytesCounter;
|
||||
this.messagesizeHistogram = messagesizeHistogram;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -45,7 +52,7 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
if ((msgKey != null) && (!msgKey.isEmpty())) {
|
||||
typedMessageBuilder = typedMessageBuilder.key(msgKey);
|
||||
}
|
||||
|
||||
int messagesize;
|
||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
|
||||
@ -54,9 +61,15 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
msgPayload
|
||||
);
|
||||
typedMessageBuilder = typedMessageBuilder.value(payload);
|
||||
// TODO: add a way to calculate the message size for AVRO messages
|
||||
messagesize = msgPayload.length();
|
||||
} else {
|
||||
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
|
||||
byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8);
|
||||
typedMessageBuilder = typedMessageBuilder.value(array);
|
||||
messagesize = array.length;
|
||||
}
|
||||
messagesizeHistogram.update(messagesize);
|
||||
bytesCounter.inc(messagesize);
|
||||
|
||||
//TODO: add error handling with failed message production
|
||||
if (!asyncPulsarOp) {
|
||||
|
@ -23,11 +23,13 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final PulsarSpace clientSpace;
|
||||
private final LongFunction<PulsarOp> opFunc;
|
||||
private final PulsarActivity pulsarActivity;
|
||||
|
||||
// TODO: Add docs for the command template with respect to the OpTemplate
|
||||
|
||||
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) {
|
||||
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache, PulsarActivity pulsarActivity) {
|
||||
// TODO: Consider parsing map structures into equivalent binding representation
|
||||
this.pulsarActivity = pulsarActivity;
|
||||
this.opTpl = opTemplate;
|
||||
this.cmdTpl = new CommandTemplate(opTemplate);
|
||||
|
||||
@ -239,7 +241,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
producerFunc,
|
||||
async_api_func,
|
||||
keyFunc,
|
||||
valueFunc);
|
||||
valueFunc,
|
||||
pulsarActivity.getBytesCounter(),
|
||||
pulsarActivity.getMessagesizeHistogram());
|
||||
}
|
||||
|
||||
private LongFunction<PulsarOp> resolveMsgConsume(
|
||||
@ -304,7 +308,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
consumer_name_func.apply(l)
|
||||
);
|
||||
|
||||
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func);
|
||||
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func,
|
||||
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram());
|
||||
}
|
||||
|
||||
private LongFunction<PulsarOp> resolveMsgRead(
|
||||
|
Loading…
Reference in New Issue
Block a user