Add messagesize histogram

This commit is contained in:
Enrico Olivelli 2021-03-23 12:17:46 +01:00
parent 0f53ea63a2
commit c166b479ca
5 changed files with 28 additions and 11 deletions

View File

@ -27,7 +27,7 @@
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
@ -75,7 +75,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
<version>1.10.1</version>
</dependency>
</dependencies>

View File

@ -1,7 +1,7 @@
package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
@ -23,6 +23,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer bindTimer;
public Timer executeTimer;
public Counter bytesCounter;
public Histogram messagesizeHistogram;
private PulsarSpaceCache pulsarCache;
private PulsarNBClientConf clientConf;
@ -45,7 +46,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile);
@ -95,4 +96,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Counter getBytesCounter() {
return bytesCounter;
}
public Histogram getMessagesizeHistogram() {
return messagesizeHistogram;
}
}

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
@ -24,6 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
@ -31,13 +33,15 @@ public class PulsarProducerMapper extends PulsarOpMapper {
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
Counter bytesCounter) {
Counter bytesCounter,
Histogram messagesizeHistogram) {
super(cmdTpl, clientSpace);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
}
@Override
@ -53,6 +57,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
asyncApi,
msgKey,
msgPayload,
bytesCounter);
bytesCounter,
messagesizeHistogram);
}
}

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
@ -23,19 +24,22 @@ public class PulsarProducerOp implements PulsarOp {
private final String msgPayload;
private final boolean asyncPulsarOp;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
String key,
String payload,
Counter bytesCounter) {
Counter bytesCounter,
Histogram messagesizeHistogram) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
}
@Override
@ -48,7 +52,7 @@ public class PulsarProducerOp implements PulsarOp {
if ((msgKey != null) && (!msgKey.isEmpty())) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
int messagesize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
@ -58,12 +62,14 @@ public class PulsarProducerOp implements PulsarOp {
);
typedMessageBuilder = typedMessageBuilder.value(payload);
// TODO: add a way to calculate the message size for AVRO messages
bytesCounter.inc(msgPayload.length());
messagesize = msgPayload.length();
} else {
byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8);
typedMessageBuilder = typedMessageBuilder.value(array);
bytesCounter.inc(array.length);
messagesize = array.length;
}
messagesizeHistogram.update(messagesize);
bytesCounter.inc(messagesize);
//TODO: add error handling with failed message production
if (!asyncPulsarOp) {

View File

@ -183,7 +183,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
async_api_func,
keyFunc,
valueFunc,
pulsarActivity.getBytesCounter());
pulsarActivity.getBytesCounter(),
pulsarActivity.getMessagesizeHistogram());
}
private LongFunction<PulsarOp> resolveMsgConsume(