This commit is contained in:
Enrico Olivelli 2021-03-30 10:24:05 +02:00
parent f10bd283f3
commit 5dfcc59602
14 changed files with 56 additions and 48 deletions

View File

@ -29,6 +29,10 @@ public class PulsarAction implements SyncAction {
@Override
public int runCycle(long cycle) {
// let's fail the action if some async operation failed
activity.failOnAsyncOperationFailure();
long start = System.nanoTime();
PulsarOp pulsarOp;

View File

@ -42,6 +42,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
private NBErrorHandler errorhandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer;
private volatile Throwable asyncOperationFailure;
// private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
@ -172,5 +173,13 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
return messagesizeHistogram;
}
public void failOnAsyncOperationFailure() {
if (asyncOperationFailure != null) {
throw new RuntimeException(asyncOperationFailure);
}
}
public void asyncOperationFailed(Throwable ex) {
this.asyncOperationFailure = asyncOperationFailure;
}
}

View File

@ -1,25 +1,16 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminOp extends SimplePulsarOp {
public class PulsarAdminOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);

View File

@ -1,17 +1,14 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerEndOp extends SimplePulsarOp {
public class PulsarBatchProducerEndOp extends SyncPulsarOp {
@Override
public void run() {
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();

View File

@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp extends SimplePulsarOp {
public class PulsarBatchProducerOp extends SyncPulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;

View File

@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerStartOp extends SimplePulsarOp {
public class PulsarBatchProducerStartOp extends SyncPulsarOp {
// TODO: ensure sane container lifecycle management
public final static ThreadLocal<List<CompletableFuture<MessageId>>> threadLocalBatchMsgContainer = new ThreadLocal<>();

View File

@ -9,10 +9,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class PulsarConsumerOp extends SimplePulsarOp {
public class PulsarConsumerOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);

View File

@ -5,5 +5,10 @@ package io.nosqlbench.driver.pulsar.ops;
*/
public interface PulsarOp {
void run(Runnable callback);
/**
* Execute the operation, invoke the timeTracker when the operation ended.
* The timeTracker can be invoked in a separate thread, it is only used for metrics.
* @param timeTracker
*/
void run(Runnable timeTracker);
}

View File

@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
@ -24,8 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
@ -33,15 +33,13 @@ public class PulsarProducerMapper extends PulsarOpMapper {
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
PulsarActivity pulsarActivity) {
super(cmdTpl, clientSpace);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.pulsarActivity = pulsarActivity;
}
@Override
@ -57,7 +55,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
asyncApi,
msgKey,
msgPayload,
bytesCounter,
messagesizeHistogram);
pulsarActivity
);
}
}

View File

@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
@ -25,21 +26,22 @@ public class PulsarProducerOp implements PulsarOp {
private final boolean asyncPulsarOp;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
String key,
String payload,
Counter bytesCounter,
Histogram messagesizeHistogram) {
PulsarActivity pulsarActivity) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.pulsarActivity = pulsarActivity;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
}
@Override
@ -89,6 +91,7 @@ public class PulsarProducerOp implements PulsarOp {
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
pulsarActivity.asyncOperationFailed(ex);
return null;
});
} catch (Exception e) {

View File

@ -8,7 +8,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp extends SimplePulsarOp {
public class PulsarReaderOp extends SyncPulsarOp {
private final Reader<?> reader;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;

View File

@ -239,8 +239,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
async_api_func,
keyFunc,
valueFunc,
pulsarActivity.getBytesCounter(),
pulsarActivity.getMessagesizeHistogram());
pulsarActivity);
}
private LongFunction<PulsarOp> resolveMsgConsume(

View File

@ -1,14 +0,0 @@
package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public abstract class SimplePulsarOp implements PulsarOp {
public void run(Runnable timeTracker) {
this.run();
timeTracker.run();
}
public abstract void run();
}

View File

@ -0,0 +1,17 @@
package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Sync Pulsar Operations including Producers and Consumers.
*/
public abstract class SyncPulsarOp implements PulsarOp {
public void run(Runnable timeTracker) {
try {
this.run();
} finally {
timeTracker.run();
}
}
public abstract void run();
}