Complete the first draft of NB5 Pulsar adapter code, except the per-thread rate limiter.

This commit is contained in:
yabinmeng 2022-11-17 10:39:37 -06:00
parent e128d2867a
commit 12f8697e0e
6 changed files with 88 additions and 45 deletions

View File

@ -16,7 +16,6 @@
package io.nosqlbench.adapter.pulsar;
import com.codahale.metrics.Gauge;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarNBClientConf;
@ -24,7 +23,6 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -35,12 +33,13 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarSpace {
public class PulsarSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
private final String name;
private final String spaceName;
private final NBConfiguration cfg;
private final String pulsarSvcUrl;
@ -51,8 +50,13 @@ public class PulsarSpace {
private PulsarAdmin pulsarAdmin;
private Schema<?> pulsarSchema;
public PulsarSpace(String name, NBConfiguration cfg) {
this.name = name;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
public PulsarSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
this.pulsarSvcUrl = cfg.get("service_url");
@ -82,6 +86,17 @@ public class PulsarSpace {
public PulsarClient getPulsarClient() { return pulsarClient; }
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public Schema<?> getPulsarSchema() { return pulsarSchema; }
public int getProducerSetCnt() { return producers.size(); }
public int getConsumerSetCnt() { return consumers.size(); }
public int getReaderSetCnt() { return readers.size(); }
public Producer<?> getProducer(String name) { return producers.get(name); }
public void setProducer(String name, Producer<?> producer) { producers.put(name, producer); }
public Consumer<?> getConsumer(String name) { return consumers.get(name); }
public void setConsumer(String name, Consumer<?> consumer) { consumers.put(name, consumer); }
public Reader<?> getReader(String name) { return readers.get(name); }
public void setReader(String name, Reader<?> reader) { readers.put(name, reader); }
/**
* Initialize
@ -147,6 +162,26 @@ public class PulsarSpace {
}
}
public void shutdownSpace() {
try {
for (Producer<?> producer : producers.values()) {
if (producer != null) producer.close();
}
for (Consumer<?> consumer : consumers.values()) {
if (consumer != null) consumer.close();
}
for (Reader<?> reader : readers.values()) {
if (reader != null) reader.close();
}
if (pulsarAdmin != null) pulsarAdmin.close();
if (pulsarClient != null) pulsarClient.close();
}
catch (Exception e) {
throw new PulsarAdapterUnexpectedException(
"Unexpected error when shutting down the Pulsar space \"" + spaceName + "\"!");
}
}
/**
* Get Pulsar schema from the definition string
*/
@ -185,6 +220,11 @@ public class PulsarSpace {
pulsarSchema = Schema.KeyValue(pulsarKeySchema, pulsarSchema, keyValueEncodingType);
}
}
@Override
public void close() {
shutdownSpace();
}
}

View File

@ -23,9 +23,6 @@ import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
@ -37,7 +34,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@ -51,10 +47,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
protected final ParsedOp parsedOp;
protected final PulsarSpace pulsarSpace;
protected final PulsarAdapterMetrics pulsarAdapterMetrics;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<String> tgtNameFunc;
@ -62,8 +54,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
protected final long totalCycleNum;
protected RateLimiter per_thread_cyclelimiter;
public PulsarBaseOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
@ -81,17 +71,10 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
this.pulsarAdapterMetrics = new PulsarAdapterMetrics(this, defaultMetricsPrefix);
if (instrument) {
pulsarAdapterMetrics.initPulsarAdapterInstrumentation();
}
pulsarAdapterMetrics.initPulsarAdapterInstrumentation();
totalThreadNum = NumberUtils.toInt(parsedOp.getStaticValue("threads"));
totalCycleNum = NumberUtils.toLong(parsedOp.getStaticValue("cycles"));
this.parsedOp.getOptionalStaticConfig("per_thread_cyclerate", String.class)
.map(RateSpec::new)
.ifPresent(spec -> per_thread_cyclelimiter =
RateLimiters.createOrUpdate(this, "cycles", per_thread_cyclelimiter, spec));
}
@Override
@ -187,11 +170,11 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
apiMetricsPrefix = apiType;
if (apiType.equalsIgnoreCase(PulsarAdapterUtil.PULSAR_API_TYPE.PRODUCER.label))
apiMetricsPrefix += producers.size();
apiMetricsPrefix += pulsarSpace.getProducerSetCnt();
else if (apiType.equalsIgnoreCase(PulsarAdapterUtil.PULSAR_API_TYPE.CONSUMER.label))
apiMetricsPrefix += consumers.size();
apiMetricsPrefix += pulsarSpace.getConsumerSetCnt();
else if (apiType.equalsIgnoreCase(PulsarAdapterUtil.PULSAR_API_TYPE.READER.label))
apiMetricsPrefix += readers.size();
apiMetricsPrefix += pulsarSpace.getReaderSetCnt();
apiMetricsPrefix += "_";
}
@ -257,7 +240,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
String producerName = getEffectiveProducerName(cycleProducerName);
String producerCacheKey = PulsarAdapterUtil.buildCacheKey(producerName, topicName);
Producer<?> producer = producers.get(producerCacheKey);
Producer<?> producer = pulsarSpace.getProducer(producerCacheKey);
if (producer == null) {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
@ -280,7 +263,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
}
producer = producerBuilder.create();
producers.put(producerCacheKey, producer);
pulsarSpace.setProducer(producerCacheKey, producer);
if (instrument) {
pulsarAdapterMetrics.registerProducerApiMetrics(producer,
@ -469,7 +452,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumerName,
subscriptionName,
consumerTopicListString);
Consumer<?> consumer = consumers.get(consumerCacheKey);
Consumer<?> consumer = pulsarSpace.getConsumer(consumerCacheKey);
if (consumer == null) {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
@ -525,7 +508,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
}
consumer = consumerBuilder.subscribe();
consumers.put(consumerCacheKey, consumer);
pulsarSpace.setConsumer(consumerCacheKey, consumer);
if (instrument) {
pulsarAdapterMetrics.registerConsumerApiMetrics(
@ -640,7 +623,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
}
String readerCacheKey = PulsarAdapterUtil.buildCacheKey(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(readerCacheKey);
Reader<?> reader = pulsarSpace.getReader(readerCacheKey);
if (reader == null) {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();;
@ -676,7 +659,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(readerCacheKey, reader);
pulsarSpace.setReader(readerCacheKey, reader);
}
return reader;

View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.pulsar.exception;
public class PulsarAdapterAsyncOperationFailedException extends RuntimeException {
public PulsarAdapterAsyncOperationFailedException(Throwable t) {
super(t);
printStackTrace();
}
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.pulsar.ops;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.*;
import org.apache.commons.lang3.StringUtils;
@ -127,15 +128,14 @@ public class MessageConsumerOp extends PulsarClientOp {
try {
handleMessage(transaction, message);
} catch (PulsarClientException | TimeoutException e) {
pulsarActivity.asyncOperationFailed(e);
throw new PulsarAdapterAsyncOperationFailedException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
pulsarActivity.asyncOperationFailed(e.getCause());
throw new PulsarAdapterAsyncOperationFailedException(e.getCause());
}
}).exceptionally(ex -> {
pulsarActivity.asyncOperationFailed(ex);
return null;
throw new PulsarAdapterAsyncOperationFailedException(ex);
});
}
catch (Exception e) {

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.pulsar.ops;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
@ -231,8 +232,6 @@ public class MessageProducerOp extends PulsarClientOp {
throw new PulsarAdapterUnexpectedException(errMsg);
}
timeTracker.run();
}
else {
try {
@ -272,16 +271,13 @@ public class MessageProducerOp extends PulsarClientOp {
msgValue);
}
}
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Async message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgPropRawJsonStr + "; " +
"payload - " + msgValue);
pulsarActivity.asyncOperationFailed(ex);
return null;
throw new PulsarAdapterAsyncOperationFailedException(ex);
});
}
catch (Exception e) {

View File

@ -20,9 +20,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.dispensers.PulsarBaseOpDispenser;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;