align remaining adapters to API (smaller) changes

This commit is contained in:
Jonathan Shook 2024-10-24 14:21:11 -05:00
parent 109ca07b2d
commit defa19ab7a
49 changed files with 324 additions and 268 deletions

View File

@ -30,6 +30,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "amqp")
public class AmqpDriverAdapter extends BaseDriverAdapter<AmqpTimeTrackOp, AmqpSpace> {
@ -40,14 +41,12 @@ public class AmqpDriverAdapter extends BaseDriverAdapter<AmqpTimeTrackOp, AmqpSp
}
@Override
public OpMapper<AmqpTimeTrackOp> getOpMapper() {
StringDriverSpaceCache<? extends AmqpSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new AmqpOpMapper(this, adapterConfig, spaceCache);
public OpMapper<AmqpTimeTrackOp, AmqpSpace> getOpMapper() {
return new AmqpOpMapper(this, getConfiguration(), getSpaceCache());
}
@Override
public Function<String, ? extends AmqpSpace> getSpaceInitializer(NBConfiguration cfg) {
public LongFunction<AmqpSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new AmqpSpace(s, cfg);
}

View File

@ -19,6 +19,8 @@ package io.nosqlbench.adapter.amqp;
import io.nosqlbench.adapter.amqp.dispensers.AmqpMsgRecvOpDispenser;
import io.nosqlbench.adapter.amqp.dispensers.AmqpMsgSendOpDispenser;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
@ -29,23 +31,26 @@ import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp> {
import java.util.function.LongFunction;
public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp,AmqpSpace> {
private final static Logger logger = LogManager.getLogger(AmqpOpMapper.class);
private final NBConfiguration cfg;
private final StringDriverSpaceCache<? extends AmqpSpace> spaceCache;
private final ConcurrentSpaceCache<AmqpSpace> spaceCache;
private final DriverAdapter adapter;
public AmqpOpMapper(DriverAdapter adapter, NBConfiguration cfg, StringDriverSpaceCache<? extends AmqpSpace> spaceCache) {
public AmqpOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<AmqpSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends AmqpTimeTrackOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
public OpDispenser<AmqpTimeTrackOp> apply(ParsedOp op, LongFunction spaceInitF) {
//public OpDispenser<AmqpTimeTrackOp> apply(ParsedOp op, LongFunction<AmqpTimeTrackOp> spaceInitF) {
int spaceName = op.getStaticConfigOr("space", 0);
AmqpSpace amqpSpace = spaceCache.get(spaceName);
/*
@ -68,4 +73,5 @@ public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp> {
}
}
}

View File

@ -23,6 +23,7 @@ import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil;
import io.nosqlbench.adapter.amqp.util.AmqpClientConf;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -44,11 +45,10 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class AmqpSpace implements AutoCloseable {
public class AmqpSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(AmqpSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
private final AmqpClientConf amqpClientConf;
@ -117,8 +117,8 @@ public class AmqpSpace implements AutoCloseable {
private long totalCycleNum;
private long totalThreadNum;
public AmqpSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
public AmqpSpace(long idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
String amqpClientConfFileName = cfg.get("config");

View File

@ -27,6 +27,7 @@ import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
import java.util.function.LongFunction;
// TODO: Add details to dataapi.md in main resources folder, a la cqld4.md
@ -42,7 +43,7 @@ public class DataApiDriverAdapter extends BaseDriverAdapter<DataApiBaseOp, DataA
}
@Override
public Function<String, ? extends DataApiSpace> getSpaceInitializer(NBConfiguration cfg) {
public LongFunction<DataApiSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new DataApiSpace(s, cfg);
}

View File

@ -21,12 +21,14 @@ import io.nosqlbench.adapter.dataapi.ops.DataApiBaseOp;
import io.nosqlbench.adapter.dataapi.ops.DataApiOpType;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class DataApiOpMapper implements OpMapper<DataApiBaseOp> {
public class DataApiOpMapper implements OpMapper<DataApiBaseOp,DataApiSpace> {
private static final Logger logger = LogManager.getLogger(DataApiOpMapper.class);
private final DataApiDriverAdapter adapter;
@ -34,8 +36,10 @@ public class DataApiOpMapper implements OpMapper<DataApiBaseOp> {
this.adapter = dataApiDriverAdapter;
}
@Override
public OpDispenser<? extends DataApiBaseOp> apply(ParsedOp op) {
public OpDispenser<DataApiBaseOp> apply(ParsedOp op, LongFunction<DataApiSpace> spaceInitF) {
//public OpDispenser<DataApiBaseOp> apply(ParsedOp op, LongFunction<DataApiSpace> spaceInitF) {
TypeAndTarget<DataApiOpType, String> typeAndTarget = op.getTypeAndTarget(
DataApiOpType.class,
String.class,
@ -80,4 +84,5 @@ public class DataApiOpMapper implements OpMapper<DataApiBaseOp> {
case drop_namespace -> new DataApiDropNamespaceOpDispenser(adapter, op, typeAndTarget.targetFunction);
};
}
}

View File

@ -20,6 +20,7 @@ import com.datastax.astra.client.DataAPIClient;
import com.datastax.astra.client.Database;
import com.datastax.astra.client.admin.AstraDBAdmin;
import com.datastax.astra.client.admin.DatabaseAdmin;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -34,10 +35,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
public class DataApiSpace {
public class DataApiSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(DataApiSpace.class);
private final NBConfiguration config;
private final String name;
private String astraToken;
private String astraApiEndpoint;
private DataAPIClient dataAPIClient;
@ -48,9 +48,9 @@ public class DataApiSpace {
private AstraDBAdmin admin;
private DatabaseAdmin namespaceAdmin;
public DataApiSpace(String name, NBConfiguration cfg) {
public DataApiSpace(long name, NBConfiguration cfg) {
super(name);
this.config = cfg;
this.name = name;
setToken();
setSuperToken();
setApiEndpoint();

View File

@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "diag")
public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> implements SyntheticOpTemplateProvider {
@ -54,7 +55,7 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
}
@Override
public synchronized OpMapper<DiagOp> getOpMapper() {
public synchronized OpMapper<DiagOp,DiagSpace> getOpMapper() {
if (this.mapper == null) {
this.mapper = new DiagOpMapper(this);
}
@ -62,8 +63,8 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
}
@Override
public Function<String, ? extends DiagSpace> getSpaceInitializer(NBConfiguration cfg) {
return (String name) -> new DiagSpace(name, cfg);
public LongFunction<DiagSpace> getSpaceInitializer(NBConfiguration cfg) {
return (long name) -> new DiagSpace(name, cfg);
}
@Override

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.diag;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -28,7 +29,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.LongFunction;
public class DiagOpMapper implements OpMapper<DiagOp>, NBReconfigurable {
public class DiagOpMapper implements OpMapper<DiagOp,DiagSpace>, NBReconfigurable {
private final Map<String,DiagOpDispenser> dispensers = new LinkedHashMap<>();
private final DiagDriverAdapter adapter;
@ -37,7 +38,7 @@ public class DiagOpMapper implements OpMapper<DiagOp>, NBReconfigurable {
}
@Override
public OpDispenser<? extends DiagOp> apply(ParsedOp op) {
public OpDispenser<DiagOp> apply(ParsedOp op, LongFunction<DiagSpace> spaceInitF) {
LongFunction<DiagSpace> spaceF = adapter.getSpaceFunc(op);
DiagOpDispenser dispenser = new DiagOpDispenser(adapter,spaceF,op);
dispensers.put(op.getName(),dispenser);

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.diag;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
@ -26,20 +27,19 @@ import io.nosqlbench.nb.api.config.standard.Param;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DiagSpace implements ActivityDefObserver, AutoCloseable {
public class DiagSpace extends BaseSpace implements ActivityDefObserver {
private final Logger logger = LogManager.getLogger(DiagSpace.class);
private final NBConfiguration cfg;
private final String name;
private RateLimiter diagRateLimiter;
private long interval;
private boolean errorOnClose;
public DiagSpace(String name, NBConfiguration cfg) {
public DiagSpace(long idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
this.name = name;
applyConfig(cfg);
logger.trace(() -> "diag space initialized as '" + name + "'");
logger.trace(() -> "diag space initialized as '" + idx + "'");
}
public void applyConfig(NBConfiguration cfg) {
@ -68,7 +68,7 @@ public class DiagSpace implements ActivityDefObserver, AutoCloseable {
@Override
public void close() throws Exception {
logger.debug(() -> "closing diag space '" + this.name + "'");
logger.debug(() -> "closing diag space '" + getName() + "'");
if (errorOnClose) {
throw new RuntimeException("diag space was configured to throw this error when it was configured.");
}

View File

@ -19,6 +19,7 @@ package io.nosqlbench.adapter.dynamodb;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.nb.api.labels.NBLabels;
@ -29,6 +30,7 @@ import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.Function;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "dynamodb", maturity = Maturity.Experimental)
public class DynamoDBDriverAdapter extends BaseDriverAdapter<DynamoDBOp, DynamoDBSpace> {
@ -38,14 +40,13 @@ public class DynamoDBDriverAdapter extends BaseDriverAdapter<DynamoDBOp, DynamoD
}
@Override
public OpMapper<DynamoDBOp> getOpMapper() {
StringDriverSpaceCache<? extends DynamoDBSpace> spaceCache = getSpaceCache();
public OpMapper<DynamoDBOp,DynamoDBSpace> getOpMapper() {
NBConfiguration adapterConfig = getConfiguration();
return new DynamoDBOpMapper(this, adapterConfig, spaceCache);
return new DynamoDBOpMapper(this, adapterConfig, getSpaceCache());
}
@Override
public Function<String, ? extends DynamoDBSpace> getSpaceInitializer(NBConfiguration cfg) {
public LongFunction<DynamoDBSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new DynamoDBSpace(s,cfg);
}

View File

@ -21,27 +21,31 @@ import io.nosqlbench.adapter.dynamodb.opdispensers.*;
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
public class DynamoDBOpMapper implements OpMapper<DynamoDBOp> {
import java.util.function.LongFunction;
public class DynamoDBOpMapper implements OpMapper<DynamoDBOp,DynamoDBSpace> {
private final NBConfiguration cfg;
private final StringDriverSpaceCache<? extends DynamoDBSpace> cache;
private final ConcurrentSpaceCache<DynamoDBSpace> cache;
private final DriverAdapter adapter;
public DynamoDBOpMapper(DriverAdapter adapter, NBConfiguration cfg, StringDriverSpaceCache<? extends DynamoDBSpace> cache) {
public DynamoDBOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<DynamoDBSpace> cache) {
this.cfg = cfg;
this.cache = cache;
this.adapter = adapter;
}
@Override
public OpDispenser<DynamoDBOp> apply(ParsedOp op) {
String space = op.getStaticConfigOr("space", "default");
public OpDispenser<DynamoDBOp> apply(ParsedOp op, LongFunction<DynamoDBSpace> spaceInitF) {
int space = op.getStaticConfigOr("space", 0);
DynamoDB ddb = cache.get(space).getDynamoDB();
/*

View File

@ -21,6 +21,7 @@ import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -29,12 +30,11 @@ import io.nosqlbench.nb.api.errors.OpConfigError;
import java.util.Optional;
public class DynamoDBSpace {
private final String name;
public class DynamoDBSpace extends BaseSpace {
DynamoDB dynamoDB;
public DynamoDBSpace(String name, NBConfiguration cfg) {
this.name = name;
public DynamoDBSpace(long idx, NBConfiguration cfg) {
super(idx);
AmazonDynamoDB client = createClient(cfg);
dynamoDB= new DynamoDB(client);
}

View File

@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "http")
public class HttpDriverAdapter extends BaseDriverAdapter<HttpOp, HttpSpace> {
@ -54,15 +55,14 @@ public class HttpDriverAdapter extends BaseDriverAdapter<HttpOp, HttpSpace> {
}
@Override
public OpMapper<HttpOp> getOpMapper() {
StringDriverSpaceCache<? extends HttpSpace> spaceCache = getSpaceCache();
public OpMapper<HttpOp,HttpSpace> getOpMapper() {
NBConfiguration config = getConfiguration();
return new HttpOpMapper(this, config, spaceCache);
return new HttpOpMapper(this, config, getSpaceCache());
}
@Override
public Function<String, ? extends HttpSpace> getSpaceInitializer(NBConfiguration cfg) {
return spaceName -> new HttpSpace(this, spaceName, cfg);
public LongFunction<HttpSpace> getSpaceInitializer(NBConfiguration cfg) {
return idx -> new HttpSpace(idx, this, cfg);
}
@Override

View File

@ -18,29 +18,31 @@ package io.nosqlbench.adapter.http.core;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.LongFunction;
public class HttpOpMapper implements OpMapper<HttpOp> {
public class HttpOpMapper implements OpMapper<HttpOp,HttpSpace> {
private final NBConfiguration cfg;
private final StringDriverSpaceCache<? extends HttpSpace> spaceCache;
private final ConcurrentSpaceCache<? extends HttpSpace> spaceCache;
private final DriverAdapter adapter;
public HttpOpMapper(DriverAdapter adapter, NBConfiguration cfg, StringDriverSpaceCache<? extends HttpSpace> spaceCache) {
public HttpOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<HttpSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends HttpOp> apply(ParsedOp op) {
public OpDispenser<HttpOp> apply(ParsedOp op, LongFunction<HttpSpace> spaceInitF) {
LongFunction<String> spaceNameF = op.getAsFunctionOr("space", "default");
LongFunction<HttpSpace> spaceFunc = l -> spaceCache.get(spaceNameF.apply(l));
LongFunction<HttpSpace> spaceFunc = l -> spaceCache.get(l);
return new HttpOpDispenser(adapter, spaceFunc, op);
}
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.http.core;
import io.nosqlbench.adapter.http.HttpDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.engine.metrics.DeltaHdrHistogramReservoir;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.nb.api.labels.NBLabeledElement;
@ -39,11 +40,10 @@ import java.util.Locale;
* HTTP client implementation is meant to be immutable. If shared-state issues
* occur, thread-local support will be re-added.
*/
public class HttpSpace implements NBLabeledElement {
public class HttpSpace extends BaseSpace implements NBLabeledElement {
private final static Logger logger = LogManager.getLogger(HttpSpace.class);
private final HttpDriverAdapter parentAdapter;
private final String name;
private final NBConfiguration cfg;
public NBMetricHistogram statusCodeHistogram;
private HttpConsoleFormats console;
@ -55,9 +55,9 @@ public class HttpSpace implements NBLabeledElement {
private boolean diagnosticsEnabled;
public HttpSpace(HttpDriverAdapter parentAdapter, String spaceName, NBConfiguration cfg) {
public HttpSpace(long idx, HttpDriverAdapter parentAdapter, NBConfiguration cfg) {
super(idx);
this.parentAdapter = parentAdapter;
this.name = spaceName;
this.cfg = cfg;
applyConfig(cfg);
this.statusCodeHistogram = parentAdapter.statusCodeHistogram;
@ -101,11 +101,7 @@ public class HttpSpace implements NBLabeledElement {
@Override
public NBLabels getLabels() {
return NBLabels.forKV("space", getSpaceName());
}
public String getSpaceName() {
return name;
return NBLabels.forKV("space", getName());
}
public boolean isDiagnosticMode() {

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.http;
import io.nosqlbench.adapter.http.core.HttpOpMapper;
import io.nosqlbench.adapter.http.core.HttpSpace;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
@ -49,8 +50,8 @@ public class HttpOpMapperTest {
HttpOpMapperTest.cfg = HttpSpace.getConfigModel().apply(Map.of());
HttpOpMapperTest.adapter = new HttpDriverAdapter(new TestComponent("parent","parent"), NBLabels.forKV());
HttpOpMapperTest.adapter.applyConfig(HttpOpMapperTest.cfg);
final StringDriverSpaceCache<? extends HttpSpace> cache = HttpOpMapperTest.adapter.getSpaceCache();
HttpOpMapperTest.mapper = new HttpOpMapper(HttpOpMapperTest.adapter, HttpOpMapperTest.cfg, cache);
ConcurrentSpaceCache<HttpSpace> spaceCache = HttpOpMapperTest.adapter.getSpaceCache();
HttpOpMapperTest.mapper = new HttpOpMapper(HttpOpMapperTest.adapter, HttpOpMapperTest.cfg, spaceCache);
}
private static ParsedOp parsedOpFor(final String yaml) {

View File

@ -17,20 +17,18 @@
package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "kafka")
public class KafkaDriverAdapter extends BaseDriverAdapter<KafkaOp, KafkaSpace> {
private final static Logger logger = LogManager.getLogger(KafkaDriverAdapter.class);
@ -40,16 +38,12 @@ public class KafkaDriverAdapter extends BaseDriverAdapter<KafkaOp, KafkaSpace> {
}
@Override
public OpMapper<KafkaOp> getOpMapper() {
StringDriverSpaceCache<? extends KafkaSpace> spaceCache = getSpaceCache();
public OpMapper<KafkaOp,KafkaSpace> getOpMapper() {
ConcurrentSpaceCache<KafkaSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new KafkaOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends KafkaSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new KafkaSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {

View File

@ -19,34 +19,34 @@ package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.dispensers.MessageConsumerOpDispenser;
import io.nosqlbench.adapter.kafka.dispensers.MessageProducerOpDispenser;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class KafkaOpMapper implements OpMapper<KafkaOp> {
public class KafkaOpMapper implements OpMapper<KafkaOp,KafkaSpace> {
private final static Logger logger = LogManager.getLogger(KafkaOpMapper.class);
private final NBConfiguration cfg;
private final StringDriverSpaceCache<? extends KafkaSpace> spaceCache;
private final DriverAdapter adapter;
private final ConcurrentSpaceCache<KafkaSpace> spaceCache;
private final DriverAdapter<KafkaOp,KafkaSpace> adapter;
public KafkaOpMapper(DriverAdapter adapter, NBConfiguration cfg, StringDriverSpaceCache<? extends KafkaSpace> spaceCache) {
this.cfg = cfg;
public KafkaOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<KafkaSpace> spaceCache) {
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends KafkaOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
KafkaSpace kafkaSpace = spaceCache.get(spaceName);
public OpDispenser<KafkaOp> apply(ParsedOp op, LongFunction<KafkaSpace> spaceInitF) {
KafkaSpace kafkaSpace = adapter.getSpaceFunc(op).apply(op.getStaticConfigOr("space",0));
/*
* If the user provides a body element, then they want to provide the JSON or

View File

@ -20,6 +20,8 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaClientConf;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -34,11 +36,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class KafkaSpace implements AutoCloseable {
public class KafkaSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(KafkaSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
// TODO: currently this NB Kafka driver only supports String type for message key and value
@ -75,7 +76,7 @@ public class KafkaSpace implements AutoCloseable {
private long totalCycleNum;
private AtomicBoolean beingShutdown = new AtomicBoolean(false);
private final AtomicBoolean beingShutdown = new AtomicBoolean(false);
public record ProducerCacheKey(String producerName, String topicName, String clientId) {
@ -89,8 +90,8 @@ public class KafkaSpace implements AutoCloseable {
new ConcurrentHashMap<>();
public KafkaSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
public KafkaSpace(int idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
this.bootstrapSvr = cfg.get("bootstrap_server");

View File

@ -17,11 +17,13 @@
package io.nosqlbench.adapter.mongodb.core;
import io.nosqlbench.adapter.mongodb.dispensers.MongoCommandOpDispenser;
import io.nosqlbench.adapter.mongodb.ops.MongoDirectCommandOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
@ -29,34 +31,33 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class MongoOpMapper implements OpMapper<Op> {
public class MongoOpMapper<MC extends MongoDirectCommandOp> implements OpMapper<MongoDirectCommandOp,MongoSpace> {
private static final Logger logger = LogManager.getLogger(MongoOpMapper.class);
private final MongodbDriverAdapter adapter;
private final NBConfiguration configuration;
private final StringDriverSpaceCache<? extends MongoSpace> spaceCache;
private final ConcurrentSpaceCache<MongoSpace> spaceCache;
public MongoOpMapper(MongodbDriverAdapter adapter, NBConfiguration cfg,
StringDriverSpaceCache<? extends MongoSpace> spaceCache) {
ConcurrentSpaceCache<MongoSpace> spaceCache) {
this.configuration = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp op) {
public OpDispenser<MongoDirectCommandOp> apply(ParsedOp op, LongFunction<MongoSpace> spaceInitF) {
LongFunction<String> ctxNamer = op.getAsFunctionOr("space", "default");
LongFunction<MongoSpace> spaceF = l -> adapter.getSpaceCache()
.get(ctxNamer.apply(l));
String connectionURL = op.getStaticConfigOr("connection", "unknown");
if (connectionURL == null) {
throw new BasicError("Must provide a connection value for use by the MongoDB adapter.");
}
spaceCache.get(ctxNamer.apply(0L)).createMongoClient(connectionURL);
spaceCache.get(0).createMongoClient(connectionURL);
Optional<LongFunction<String>> oDatabaseF = op.getAsOptionalFunction("database");
if (oDatabaseF.isEmpty()) {
@ -71,14 +72,15 @@ public class MongoOpMapper implements OpMapper<Op> {
if (target.isPresent()) {
TypeAndTarget<MongoDBOpTypes, String> targetData = target.get();
return switch (targetData.enumId) {
case command -> new MongoCommandOpDispenser(adapter, spaceF, op);
case command -> new MongoCommandOpDispenser(adapter, spaceInitF, op);
};
}
// For everything else use the command API
else {
return new MongoCommandOpDispenser(adapter, spaceF, op);
return new MongoCommandOpDispenser(adapter, spaceInitF, op);
}
}
}

View File

@ -22,6 +22,7 @@ import com.mongodb.ServerApi;
import com.mongodb.ServerApiVersion;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import com.mongodb.client.MongoDatabase;
import io.nosqlbench.nb.api.components.core.NBNamedElement;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
@ -38,15 +39,12 @@ import org.bson.codecs.configuration.CodecRegistry;
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
public class MongoSpace implements NBNamedElement, AutoCloseable {
public class MongoSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(MongoSpace.class);
private final String spaceName;
private final NBConfiguration mongoConfig;
private MongoClient mongoClient;
public MongoSpace(String name, NBConfiguration cfg) {
this.spaceName = name;
this.mongoConfig = cfg;
public MongoSpace(long idx, NBConfiguration cfg) {
super(idx);
}
public static NBConfigModel getConfigModel() {
@ -59,11 +57,6 @@ public class MongoSpace implements NBNamedElement, AutoCloseable {
}
@Override
public String getName() {
return spaceName;
}
@Override
public void close() {
try {
@ -72,7 +65,7 @@ public class MongoSpace implements NBNamedElement, AutoCloseable {
}
} catch (Exception e) {
logger.error(() -> "auto-closeable mongodb connection threw exception in " +
"mongodb space(" + this.spaceName + "): " + e);
"mongodb space(" + getName() + "): " + e);
throw new RuntimeException(e);
}
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.mongodb.core;
import io.nosqlbench.adapter.mongodb.ops.MongoDirectCommandOp;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
@ -26,26 +27,27 @@ import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.annotations.Service;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
/**
* Special thanks to Justin Chu who authored the original NoSQLBench MongoDB ActivityType.
*/
@Service(value = DriverAdapter.class, selector = "mongodb")
public class MongodbDriverAdapter extends BaseDriverAdapter<Op, MongoSpace> {
public class MongodbDriverAdapter extends BaseDriverAdapter<MongoDirectCommandOp, MongoSpace> {
public MongodbDriverAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels);
}
@Override
public OpMapper<Op> getOpMapper() {
public OpMapper<MongoDirectCommandOp,MongoSpace> getOpMapper() {
return new MongoOpMapper(this, getConfiguration(), getSpaceCache());
}
@Override
public Function<String, ? extends MongoSpace> getSpaceInitializer(NBConfiguration cfg) {
return spaceName -> new MongoSpace(spaceName, cfg);
public LongFunction<MongoSpace> getSpaceInitializer(NBConfiguration cfg) {
return idx -> new MongoSpace(idx, cfg);
}
@Override

View File

@ -20,24 +20,29 @@ import io.nosqlbench.adapter.mongodb.core.MongoSpace;
import io.nosqlbench.adapter.mongodb.ops.MongoDirectCommandOp;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.bson.Document;
import org.bson.conversions.Bson;
import java.util.Map;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class MongoCommandOpDispenser extends BaseOpDispenser<Op, MongoSpace> {
public class MongoCommandOpDispenser extends BaseOpDispenser<MongoDirectCommandOp, MongoSpace> {
private final LongFunction<MongoDirectCommandOp> mongoOpF;
public MongoCommandOpDispenser(DriverAdapter<Op, MongoSpace> adapter, LongFunction<MongoSpace> ctxFunc, ParsedOp op) {
public MongoCommandOpDispenser(
DriverAdapter adapter, LongFunction<MongoSpace> ctxFunc,
ParsedOp op
) {
super(adapter, op);
this.mongoOpF = createOpFunc(ctxFunc, op);
}
private LongFunction<MongoDirectCommandOp> createOpFunc(LongFunction<MongoSpace> ctxFunc, ParsedOp op) {
private LongFunction<MongoDirectCommandOp> createOpFunc(LongFunction<? extends MongoSpace> ctxFunc, ParsedOp op) {
LongFunction<?> payload = op.getAsRequiredFunction("stmt", Object.class);
Object exampleValue = payload.apply(0);
@ -53,14 +58,14 @@ public class MongoCommandOpDispenser extends BaseOpDispenser<Op, MongoSpace> {
final LongFunction<String> databaseNamerF = op.getAsRequiredFunction("database", String.class);
return l -> new MongoDirectCommandOp(
ctxFunc.apply(l).getClient(),
databaseNamerF.apply(l),
bsonFunc.apply(l)
ctxFunc.apply(l).getClient(),
databaseNamerF.apply(l),
bsonFunc.apply(l)
);
}
@Override
public Op getOp(long cycle) {
public MongoDirectCommandOp getOp(long cycle) {
return mongoOpF.apply(cycle);
}
}

View File

@ -25,8 +25,8 @@ import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "neo4j")
@ -37,12 +37,12 @@ public class Neo4JDriverAdapter extends BaseDriverAdapter<Neo4JBaseOp, Neo4JSpac
}
@Override
public OpMapper getOpMapper() {
public OpMapper<Neo4JBaseOp,Neo4JSpace> getOpMapper() {
return new Neo4JOpMapper(this, getSpaceCache());
}
@Override
public Function<String, ? extends Neo4JSpace> getSpaceInitializer(NBConfiguration cfg) {
public LongFunction<Neo4JSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new Neo4JSpace(s, cfg);
}

View File

@ -21,27 +21,26 @@ import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp> {
private final StringDriverSpaceCache<? extends Neo4JSpace> cache;
public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp,Neo4JSpace> {
private final Neo4JDriverAdapter adapter;
public Neo4JOpMapper(Neo4JDriverAdapter adapter, StringDriverSpaceCache<? extends Neo4JSpace> cache) {
public Neo4JOpMapper(Neo4JDriverAdapter adapter, ConcurrentSpaceCache<Neo4JSpace> cache) {
this.adapter = adapter;
this.cache = cache;
}
@Override
public OpDispenser<? extends Neo4JBaseOp> apply(ParsedOp op) {
public OpDispenser<Neo4JBaseOp> apply(ParsedOp op, LongFunction<Neo4JSpace> spaceInitF) {
TypeAndTarget<Neo4JOpType, String> typeAndTarget = op.getTypeAndTarget(Neo4JOpType.class, String.class);
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
LongFunction<Neo4JSpace> spaceFunc = adapter.getSpaceFunc(op);
return switch (typeAndTarget.enumId) {
case sync_autocommit -> new Neo4JSyncAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.BasicError;
@ -34,15 +35,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
public class Neo4JSpace implements AutoCloseable {
public class Neo4JSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(Neo4JSpace.class);
private final String space;
private Driver driver;
private SessionConfig sessionConfig;
public Neo4JSpace(String space, NBConfiguration cfg) {
this.space = space;
public Neo4JSpace(long idx, NBConfiguration cfg) {
super(idx);
this.driver = initializeDriver(cfg);
driver.verifyConnectivity();
}

View File

@ -20,7 +20,7 @@ import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.annotations.Service;
@ -30,6 +30,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "pulsar")
public class PulsarDriverAdapter extends BaseDriverAdapter<PulsarOp, PulsarSpace> {
@ -41,14 +43,14 @@ public class PulsarDriverAdapter extends BaseDriverAdapter<PulsarOp, PulsarSpace
}
@Override
public OpMapper<PulsarOp> getOpMapper() {
StringDriverSpaceCache<? extends PulsarSpace> spaceCache = getSpaceCache();
public OpMapper<PulsarOp,PulsarSpace> getOpMapper() {
ConcurrentSpaceCache<PulsarSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new PulsarOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends PulsarSpace> getSpaceInitializer(NBConfiguration cfg) {
public LongFunction<PulsarSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new PulsarSpace(s, cfg);
}

View File

@ -18,34 +18,39 @@ package io.nosqlbench.adapter.pulsar;
import io.nosqlbench.adapter.pulsar.dispensers.*;
import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class PulsarOpMapper implements OpMapper<PulsarOp> {
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class PulsarOpMapper implements OpMapper<PulsarOp,PulsarSpace> {
private final static Logger logger = LogManager.getLogger(PulsarOpMapper.class);
private final NBConfiguration cfg;
private final StringDriverSpaceCache<? extends PulsarSpace> spaceCache;
private final DriverAdapter adapter;
private final ConcurrentSpaceCache<PulsarSpace> spaceCache;
private final PulsarDriverAdapter adapter;
public PulsarOpMapper(DriverAdapter adapter, NBConfiguration cfg, StringDriverSpaceCache<? extends PulsarSpace> spaceCache) {
public PulsarOpMapper(PulsarDriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<PulsarSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends PulsarOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
PulsarSpace pulsarSpace = spaceCache.get(spaceName);
public OpDispenser<PulsarOp> apply(ParsedOp op, LongFunction<PulsarSpace> spaceInitF) {
int spaceName = op.getStaticConfigOr("space", 0);
// PulsarSpace pulsarSpace = spaceCache.get(spaceName);
PulsarSpace pulsarSpace = adapter.getSpaceCache().get(spaceName);
/*
* If the user provides a body element, then they want to provide the JSON or

View File

@ -18,6 +18,8 @@ package io.nosqlbench.adapter.pulsar;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarClientConf;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -36,11 +38,10 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
public class PulsarSpace implements AutoCloseable {
public class PulsarSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
private final String pulsarSvcUrl;
@ -67,8 +68,8 @@ public class PulsarSpace implements AutoCloseable {
private final ConcurrentHashMap<ReaderCacheKey, Reader<?>> readers = new ConcurrentHashMap<>();
public PulsarSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
public PulsarSpace(long idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
this.pulsarSvcUrl = cfg.get("service_url");

View File

@ -27,24 +27,26 @@ import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import static io.nosqlbench.adapter.qdrant.QdrantAdapterUtils.QDRANT;
@Service(value = DriverAdapter.class, selector = QDRANT)
public class QdrantDriverAdapter extends BaseDriverAdapter<QdrantBaseOp<?>, QdrantSpace> {
public class QdrantDriverAdapter extends BaseDriverAdapter<QdrantBaseOp, QdrantSpace> {
public QdrantDriverAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels);
}
@Override
public OpMapper<QdrantBaseOp<?>> getOpMapper() {
public OpMapper<QdrantBaseOp,QdrantSpace> getOpMapper() {
return new QdrantOpMapper(this);
}
@Override
public Function<String, ? extends QdrantSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new QdrantSpace(s, cfg);
public LongFunction<QdrantSpace> getSpaceInitializer(NBConfiguration cfg) {
return (long s) -> new QdrantSpace(s, cfg);
}
@Override

View File

@ -21,12 +21,16 @@ import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
import io.nosqlbench.adapter.qdrant.types.QdrantOpType;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class QdrantOpMapper implements OpMapper<QdrantBaseOp<?>> {
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class QdrantOpMapper implements OpMapper<QdrantBaseOp,QdrantSpace> {
private static final Logger logger = LogManager.getLogger(QdrantOpMapper.class);
private final QdrantDriverAdapter adapter;
@ -42,11 +46,13 @@ public class QdrantOpMapper implements OpMapper<QdrantBaseOp<?>> {
/**
* Given an instance of a {@link ParsedOp} returns the appropriate {@link QdrantBaseOpDispenser} subclass
*
* @param op The {@link ParsedOp} to be evaluated
* @param op
* The {@link ParsedOp} to be evaluated
* @param spaceInitF
* @return The correct {@link QdrantBaseOpDispenser} subclass based on the op type
*/
@Override
public OpDispenser<? extends QdrantBaseOp<?>> apply(ParsedOp op) {
public OpDispenser<QdrantBaseOp> apply(ParsedOp op, LongFunction<QdrantSpace> spaceInitF) {
TypeAndTarget<QdrantOpType, String> typeAndTarget = op.getTypeAndTarget(
QdrantOpType.class,
String.class,
@ -73,4 +79,5 @@ public class QdrantOpMapper implements OpMapper<QdrantBaseOp<?>> {
// "mapping parsed op " + op);
};
}
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.qdrant;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -40,9 +41,8 @@ import java.time.Duration;
* @see <a href="https://github.com/qdrant/java-client">Qdrant Java client</a>
* @see <a href="https://github.com/qdrant/qdrant/blob/master/docs/grpc/docs.md">Qdrant GRPC docs</a>
*/
public class QdrantSpace implements AutoCloseable {
public class QdrantSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(QdrantSpace.class);
private final String name;
private final NBConfiguration cfg;
protected QdrantClient client;
@ -51,11 +51,10 @@ public class QdrantSpace implements AutoCloseable {
* Create a new QdrantSpace Object which stores all stateful contextual information needed to interact
* with the Qdrant database instance.
*
* @param name The name of this space
* @param cfg The configuration ({@link NBConfiguration}) for this nb run
*/
public QdrantSpace(String name, NBConfiguration cfg) {
this.name = name;
public QdrantSpace(long idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
}
@ -95,7 +94,7 @@ public class QdrantSpace implements AutoCloseable {
);
logger.info("{}: Creating new Qdrant Client with (masked) token [{}], uri/endpoint [{}]",
this.name, QdrantAdapterUtils.maskDigits(requiredToken), cfg.get("uri").toString());
getName(), QdrantAdapterUtils.maskDigits(requiredToken), cfg.get("uri").toString());
return new QdrantClient(builder.build());
}

View File

@ -41,7 +41,7 @@ import java.util.function.LongFunction;
import static io.qdrant.client.ConditionFactory.*;
public abstract class QdrantBaseOpDispenser<T> extends BaseOpDispenser<QdrantBaseOp<T>, QdrantSpace> {
public abstract class QdrantBaseOpDispenser<T> extends BaseOpDispenser<QdrantBaseOp, QdrantSpace> {
protected final LongFunction<QdrantSpace> qdrantSpaceFunction;
protected final LongFunction<QdrantClient> clientFunction;

View File

@ -17,19 +17,20 @@
package io.nosqlbench.adapter.s4j;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@Service(value = DriverAdapter.class, selector = "s4j")
public class S4JDriverAdapter extends BaseDriverAdapter<S4JOp, S4JSpace> {
@ -40,14 +41,14 @@ public class S4JDriverAdapter extends BaseDriverAdapter<S4JOp, S4JSpace> {
}
@Override
public OpMapper<S4JOp> getOpMapper() {
StringDriverSpaceCache<? extends S4JSpace> spaceCache = getSpaceCache();
public OpMapper<S4JOp,S4JSpace> getOpMapper() {
ConcurrentSpaceCache<? extends S4JSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new S4JOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends S4JSpace> getSpaceInitializer(NBConfiguration cfg) {
public LongFunction<S4JSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new S4JSpace(s, cfg);
}

View File

@ -19,6 +19,8 @@ package io.nosqlbench.adapter.s4j;
import io.nosqlbench.adapter.s4j.dispensers.MessageConsumerOpDispenser;
import io.nosqlbench.adapter.s4j.dispensers.MessageProducerOpDispenser;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
@ -29,24 +31,27 @@ import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4JOpMapper implements OpMapper<S4JOp> {
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class S4JOpMapper implements OpMapper<S4JOp,S4JSpace> {
private final static Logger logger = LogManager.getLogger(S4JOpMapper.class);
private final NBConfiguration cfg;
private final StringDriverSpaceCache<? extends S4JSpace> spaceCache;
private final ConcurrentSpaceCache<? extends S4JSpace> spaceCache;
private final DriverAdapter adapter;
public S4JOpMapper(DriverAdapter adapter, NBConfiguration cfg, StringDriverSpaceCache<? extends S4JSpace> spaceCache) {
public S4JOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<? extends S4JSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends S4JOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
S4JSpace s4jSpace = spaceCache.get(spaceName);
public OpDispenser<S4JOp> apply(ParsedOp op, LongFunction<S4JSpace> spaceInitF) {
int spaceIdx = op.getStaticConfigOr("space", 0);
S4JSpace s4jSpace = spaceCache.get(spaceIdx);
/*
* If the user provides a body element, then they want to provide the JSON or

View File

@ -20,6 +20,7 @@ import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.*;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -41,11 +42,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class S4JSpace implements AutoCloseable {
public class S4JSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(S4JSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
// - Each S4J space currently represents a number of JMS connections (\"num_conn\" NB CLI parameter);
@ -113,8 +113,8 @@ public class S4JSpace implements AutoCloseable {
private final MutablePair<Boolean, String> largePayloadSimPair = MutablePair.of(false, null);
public S4JSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
public S4JSpace(long idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
this.pulsarSvcUrl = cfg.get("service_url");
@ -399,14 +399,14 @@ public class S4JSpace implements AutoCloseable {
public String getConnLvlJmsContextIdentifier(int jmsConnSeqNum) {
return String.join(
"::",
this.spaceName,
getName(),
StringUtils.join("conn-", jmsConnSeqNum));
}
public String getSessionLvlJmsContextIdentifier(int jmsConnSeqNum, int jmsSessionSeqNum) {
return String.join(
"::",
this.spaceName,
getName(),
StringUtils.join("conn-", jmsConnSeqNum),
StringUtils.join("session-", jmsSessionSeqNum));
}

View File

@ -22,7 +22,7 @@ import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.core.NBComponent;
@ -34,7 +34,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -47,14 +48,14 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
}
@Override
public OpMapper<StdoutOp> getOpMapper() {
StringDriverSpaceCache<? extends StdoutSpace> ctxCache = getSpaceCache();
public OpMapper<StdoutOp,StdoutSpace> getOpMapper() {
ConcurrentSpaceCache<StdoutSpace> ctxCache = getSpaceCache();
return new StdoutOpMapper(this, ctxCache);
}
@Override
public Function<String, ? extends StdoutSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new StdoutSpace(cfg);
public LongFunction<StdoutSpace> getSpaceInitializer(NBConfiguration cfg) {
return (idx) -> new StdoutSpace(idx, cfg);
}
@Override
@ -68,7 +69,7 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
Set<String> activeBindingNames = new LinkedHashSet<>(opsDocList.getDocBindings().keySet());
if (activeBindingNames.size()==0) {
if (activeBindingNames.isEmpty()) {
logger.warn("Unable to synthesize op for driver=" + this.getAdapterName() + " with zero bindings.");
return List.of();
}
@ -89,7 +90,7 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
})
.collect(Collectors.toSet());
if (filteredBindingNames.size() == 0) {
if (filteredBindingNames.isEmpty()) {
logger.warn("Unable to synthesize op for driver="+getAdapterName()+" when " + activeBindingNames.size()+"/"+activeBindingNames.size() + " bindings were filtered out with bindings=" + bindings);
return List.of();
}

View File

@ -19,26 +19,26 @@ package io.nosqlbench.adapter.stdout;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class StdoutOpMapper implements OpMapper<StdoutOp> {
public class StdoutOpMapper implements OpMapper<StdoutOp,StdoutSpace> {
private final StringDriverSpaceCache<? extends StdoutSpace> ctxcache;
private final ConcurrentSpaceCache<? extends StdoutSpace> spaceCache;
private final DriverAdapter adapter;
public StdoutOpMapper(DriverAdapter adapter, StringDriverSpaceCache<? extends StdoutSpace> ctxcache) {
this.ctxcache = ctxcache;
public StdoutOpMapper(DriverAdapter adapter, ConcurrentSpaceCache<? extends StdoutSpace> spaceCache) {
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<StdoutOp> apply(ParsedOp op) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<StdoutSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
return new StdoutOpDispenser(adapter,op,ctxfunc);
public OpDispenser<StdoutOp> apply(ParsedOp op, LongFunction<StdoutSpace> spaceInitF) {
return new StdoutOpDispenser(adapter,op,adapter.getSpaceFunc(op));
}
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.stdout;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -25,12 +26,13 @@ import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.io.Writer;
public class StdoutSpace {
public class StdoutSpace extends BaseSpace {
Writer writer;
private PrintWriter console;
public StdoutSpace(NBConfiguration cfg) {
public StdoutSpace(long idx, NBConfiguration cfg) {
super(idx);
String filename = cfg.get("filename");
this.writer = createPrintWriter(filename);
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -30,13 +31,14 @@ import java.io.PrintWriter;
import java.io.Writer;
import java.net.Socket;
public class TcpClientAdapterSpace {
public class TcpClientAdapterSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
public TcpClientAdapterSpace(NBConfiguration config) {
public TcpClientAdapterSpace(long idx, NBConfiguration config) {
super(idx);
this.config = config;
this.writer = createPrintWriter();
}

View File

@ -35,6 +35,8 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@Service(value= DriverAdapter.class,selector = "tcpclient")
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {
@ -48,14 +50,14 @@ public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpCl
}
@Override
public OpMapper<TcpClientOp> getOpMapper() {
StringDriverSpaceCache<? extends TcpClientAdapterSpace> ctxCache = getSpaceCache();
return new TcpClientOpMapper(this,ctxCache);
public OpMapper<TcpClientOp,TcpClientAdapterSpace> getOpMapper() {
return new TcpClientOpMapper(this,getSpaceCache());
}
@Override
public Function<String, ? extends TcpClientAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new TcpClientAdapterSpace(cfg);
public LongFunction<TcpClientAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (idx) -> new TcpClientAdapterSpace(idx,cfg);
}
@Override

View File

@ -18,26 +18,28 @@ package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpClientOpMapper implements OpMapper<TcpClientOp> {
public class TcpClientOpMapper implements OpMapper<TcpClientOp,TcpClientAdapterSpace> {
private final StringDriverSpaceCache<? extends TcpClientAdapterSpace> ctxcache;
private final ConcurrentSpaceCache<TcpClientAdapterSpace> ctxcache;
private final TcpClientDriverAdapter adapter;
public TcpClientOpMapper(TcpClientDriverAdapter adapter, StringDriverSpaceCache<? extends TcpClientAdapterSpace> ctxcache) {
public TcpClientOpMapper(TcpClientDriverAdapter adapter, ConcurrentSpaceCache<TcpClientAdapterSpace> ctxcache) {
this.ctxcache = ctxcache;
this.adapter = adapter;
}
@Override
public OpDispenser<TcpClientOp> apply(ParsedOp op) {
public OpDispenser<TcpClientOp> apply(ParsedOp op, LongFunction<TcpClientAdapterSpace> spaceInitF) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpClientAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
LongFunction<TcpClientAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(cycle);
return new TcpClientOpDispenser(adapter,op,ctxfunc);
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -40,7 +41,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TcpServerAdapterSpace implements AutoCloseable {
public class TcpServerAdapterSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
@ -51,7 +52,8 @@ public class TcpServerAdapterSpace implements AutoCloseable {
private final List<Shutdown> managedShutdown = new ArrayList<>();
private int capacity=10;
public TcpServerAdapterSpace(NBConfiguration config) {
public TcpServerAdapterSpace(long idx, NBConfiguration config) {
super(idx);
this.config = config;
this.writer = createPrintWriter();
}

View File

@ -34,6 +34,8 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@Service(value= DriverAdapter.class, selector="tcpserver")
public class TcpServerDriverAdapter extends BaseDriverAdapter<TcpServerOp, TcpServerAdapterSpace> implements SyntheticOpTemplateProvider {
@ -47,14 +49,13 @@ public class TcpServerDriverAdapter extends BaseDriverAdapter<TcpServerOp, TcpSe
}
@Override
public OpMapper<TcpServerOp> getOpMapper() {
StringDriverSpaceCache<? extends TcpServerAdapterSpace> ctxCache = getSpaceCache();
return new TcpServerOpMapper(this,ctxCache);
public OpMapper<TcpServerOp,TcpServerAdapterSpace> getOpMapper() {
return new TcpServerOpMapper(this,getSpaceCache());
}
@Override
public Function<String, ? extends TcpServerAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new TcpServerAdapterSpace(cfg);
public LongFunction<TcpServerAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (idx) -> new TcpServerAdapterSpace(idx,cfg);
}
@Override

View File

@ -18,28 +18,29 @@ package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.StringDriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpMapper implements OpMapper<TcpServerOp> {
public class TcpServerOpMapper implements OpMapper<TcpServerOp,TcpServerAdapterSpace> {
private final StringDriverSpaceCache<? extends TcpServerAdapterSpace> ctxcache;
private final ConcurrentSpaceCache<TcpServerAdapterSpace> ctxcache;
private final TcpServerDriverAdapter adapter;
public TcpServerOpMapper(TcpServerDriverAdapter adapter, StringDriverSpaceCache<? extends TcpServerAdapterSpace> ctxcache) {
public TcpServerOpMapper(TcpServerDriverAdapter adapter, ConcurrentSpaceCache<TcpServerAdapterSpace> ctxcache) {
this.ctxcache = ctxcache;
this.adapter = adapter;
}
@Override
public OpDispenser<TcpServerOp> apply(ParsedOp op) {
public OpDispenser<TcpServerOp> apply(ParsedOp op, LongFunction<TcpServerAdapterSpace> spaceInitF) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpServerAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
LongFunction<TcpServerAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(cycle);
return new TcpServerOpDispenser(adapter,op,ctxfunc);
}
}

View File

@ -20,6 +20,7 @@ package io.nosqlbench.adapter.weaviate;
import static io.nosqlbench.adapter.weaviate.WeaviateAdapterUtils.WEAVIATE;
import java.util.function.Function;
import java.util.function.IntFunction;
import io.nosqlbench.adapter.weaviate.ops.WeaviateBaseOp;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
@ -44,8 +45,8 @@ public class WeaviateDriverAdapter extends BaseDriverAdapter<WeaviateBaseOp<?>,
}
@Override
public Function<String, ? extends WeaviateSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new WeaviateSpace(s, cfg);
public IntFunction<WeaviateSpace> getSpaceInitializer(NBConfiguration cfg) {
return (int s) -> new WeaviateSpace(s, cfg);
}
@Override

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.weaviate;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -31,6 +32,9 @@ import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
public class WeaviateOpMapper implements OpMapper<WeaviateBaseOp<?>> {
private static final Logger logger = LogManager.getLogger(WeaviateOpMapper.class);
private final WeaviateDriverAdapter adapter;
@ -45,15 +49,17 @@ public class WeaviateOpMapper implements OpMapper<WeaviateBaseOp<?>> {
}
/**
* Given an instance of a {@link ParsedOp} returns the appropriate
* {@link WeaviateBaseOpDispenser} subclass
*
* @param op The {@link ParsedOp} to be evaluated
* @return The correct {@link WeaviateBaseOpDispenser} subclass based on the op
* type
*/
* Given an instance of a {@link ParsedOp} returns the appropriate
* {@link WeaviateBaseOpDispenser} subclass
*
* @param op
* The {@link ParsedOp} to be evaluated
* @param spaceInitF
* @return The correct {@link WeaviateBaseOpDispenser} subclass based on the op
* type
*/
@Override
public OpDispenser<? extends WeaviateBaseOp<?>> apply(ParsedOp op) {
public OpDispenser<WeaviateBaseOp<?>> apply(ParsedOp op, LongFunction<? extends Space> spaceInitF) {
TypeAndTarget<WeaviateOpType, String> typeAndTarget = op.getTypeAndTarget(WeaviateOpType.class, String.class,
"type", "target");
logger.info(() -> "Using '" + typeAndTarget.enumId + "' op type for op template '" + op.getName() + "'");

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -45,9 +46,8 @@ import io.weaviate.client.v1.auth.exception.AuthException;
* @see <a href="https://github.com/weaviate/java-client">Weaviate Java
* client</a>
*/
public class WeaviateSpace implements AutoCloseable {
public class WeaviateSpace extends BaseSpace {
private final static Logger logger = LogManager.getLogger(WeaviateSpace.class);
private final String name;
private final NBConfiguration cfg;
protected WeaviateClient client;
@ -56,11 +56,10 @@ public class WeaviateSpace implements AutoCloseable {
* Create a new WeaviateSpace Object which stores all stateful contextual
* information needed to interact with the <b>Weaviate</b> database instance.
*
* @param name The name of this space
* @param cfg The configuration ({@link NBConfiguration}) for this nb run
*/
public WeaviateSpace(String name, NBConfiguration cfg) {
this.name = name;
public WeaviateSpace(int idx, NBConfiguration cfg) {
super(idx);
this.cfg = cfg;
}
@ -98,9 +97,9 @@ public class WeaviateSpace implements AutoCloseable {
}
logger.info("{}: Creating new Weaviate Client with (masked) token [{}], uri/endpoint [{}]",
this.name, WeaviateAdapterUtils.maskDigits(requiredToken), uri);
this.getName(), WeaviateAdapterUtils.maskDigits(requiredToken), uri);
Config config = new Config(scheme, uri);
if (cfg.getOptional("username").isPresent() && cfg.getOptional("password").isPresent()) {
return WeaviateAuthClient.clientPassword(config, cfg.getOptional("username").get(),
cfg.getOptional("password").get(), null);

View File

@ -25,35 +25,37 @@ import io.weaviate.client.WeaviateClient;
/**
* Delete a Weaviate collection.
*
*
* @see <a href=
* "https://weaviate.io/developers/weaviate/manage-data/collections#delete-a-collection">Delete
* Collection docs</a>.
* "https://weaviate.io/developers/weaviate/manage-data/collections#delete-a-collection">Delete
* Collection docs</a>.
* @see <a href=
* "https://weaviate.io/developers/weaviate/api/rest#tag/schema/delete/schema/{className}">Delete
* Collection REST API</a>.
* "https://weaviate.io/developers/weaviate/api/rest#tag/schema/delete/schema/{className}">Delete
* Collection REST API</a>.
*/
public class WeaviateDeleteCollectionOpDispenser extends WeaviateBaseOpDispenser<String> {
public class WeaviateDeleteCollectionOpDispenser extends WeaviateBaseOpDispenser<WeaviateDeleteCollectionOp> {
public WeaviateDeleteCollectionOpDispenser(WeaviateDriverAdapter adapter, ParsedOp op,
LongFunction<String> targetF) {
super(adapter, op, targetF);
}
public WeaviateDeleteCollectionOpDispenser(
WeaviateDriverAdapter adapter, ParsedOp op,
LongFunction<String> targetF) {
super(adapter, op, targetF);
}
@Override
public LongFunction<String> getParamFunc(LongFunction<WeaviateClient> clientF, ParsedOp op,
LongFunction<String> targetF) {
@Override
public LongFunction<WeaviateDeleteCollectionOp> getParamFunc(
LongFunction<WeaviateClient> clientF, ParsedOp op,
LongFunction<String> targetF
) {
// LongFunction<String> ebF = l -> targetF.apply(l);
//
// final LongFunction<String> lastF = ebF;
// return l -> lastF.apply(l);
return l -> targetF.apply(l);
}
return l -> new WeaviateDeleteCollectionOp(clientF.apply(l), targetF.apply(l));
}
@Override
public LongFunction<WeaviateBaseOp<String>> createOpFunc(LongFunction<String> paramF,
LongFunction<WeaviateClient> clientF, ParsedOp op, LongFunction<String> targetF) {
return l -> new WeaviateDeleteCollectionOp(clientF.apply(l), paramF.apply(l));
}
@Override
public LongFunction<WeaviateBaseOp<WeaviateDeleteCollectionOp>> createOpFunc(LongFunction<WeaviateDeleteCollectionOp> paramF, LongFunction<WeaviateClient> clientF, ParsedOp op, LongFunction<String> targetF) {
return l -> new WeaviateDeleteCollectionOp(clientF.apply(l), paramF.apply(l));
}
}