Add bytes counter to consumer

This commit is contained in:
Enrico Olivelli 2021-03-23 16:07:38 +01:00
parent c166b479ca
commit d7dfafaae1
3 changed files with 26 additions and 5 deletions

View File

@ -1,5 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer;
@ -20,14 +22,20 @@ import java.util.function.LongFunction;
public class PulsarConsumerMapper extends PulsarOpMapper {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> asyncApiFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Consumer<?>> consumerFunc,
LongFunction<Boolean> asyncApiFunc) {
LongFunction<Boolean> asyncApiFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
super(cmdTpl, clientSpace);
this.consumerFunc = consumerFunc;
this.asyncApiFunc = asyncApiFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
}
@Override
@ -39,7 +47,9 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
consumer,
clientSpace.getPulsarSchema(),
asyncApi,
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds()
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
bytesCounter,
messagesizeHistogram
);
}
}

View File

@ -1,5 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
@ -18,12 +20,18 @@ public class PulsarConsumerOp implements PulsarOp {
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
private final int timeoutSeconds;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds) {
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds,
Counter bytesCounter,
Histogram messagesizeHistogram) {
this.consumer = consumer;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
this.timeoutSeconds = timeoutSeconds;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
}
public void syncConsume() {
@ -54,7 +62,9 @@ public class PulsarConsumerOp implements PulsarOp {
logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData()));
}
}
int messagesize = message.getData().length;
bytesCounter.inc(messagesize);
messagesizeHistogram.update(messagesize);
consumer.acknowledge(message.getMessageId());
} catch (Exception e) {
throw new RuntimeException(e);

View File

@ -249,7 +249,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
consumer_name_func.apply(l)
);
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func);
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func,
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram());
}
private LongFunction<PulsarOp> resolveMsgRead(