jshook/nosqlbench-2068-opspaces (#2069)

* force mappers to use op-contextual space bindings

* move space de-init to owner (adapter component)

* make specific adapters use op-contextual spaces
This commit is contained in:
Jonathan Shook 2024-11-05 16:59:28 -06:00 committed by GitHub
parent ea7fceae49
commit 722e8b53d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 237 additions and 202 deletions

View File

@ -41,7 +41,7 @@ public class AmqpDriverAdapter extends BaseDriverAdapter<AmqpTimeTrackOp, AmqpSp
@Override @Override
public OpMapper<AmqpTimeTrackOp, AmqpSpace> getOpMapper() { public OpMapper<AmqpTimeTrackOp, AmqpSpace> getOpMapper() {
return new AmqpOpMapper(this, getConfiguration(), getSpaceCache()); return new AmqpOpMapper(this, getConfiguration());
} }
@Override @Override

View File

@ -37,12 +37,10 @@ public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp,AmqpSpace> {
private final static Logger logger = LogManager.getLogger(AmqpOpMapper.class); private final static Logger logger = LogManager.getLogger(AmqpOpMapper.class);
private final NBConfiguration cfg; private final NBConfiguration cfg;
private final ConcurrentSpaceCache<AmqpSpace> spaceCache; private final AmqpDriverAdapter adapter;
private final DriverAdapter adapter;
public AmqpOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<AmqpSpace> spaceCache) { public AmqpOpMapper(AmqpDriverAdapter adapter, NBConfiguration cfg) {
this.cfg = cfg; this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter; this.adapter = adapter;
} }
@ -50,7 +48,6 @@ public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp,AmqpSpace> {
public OpDispenser<AmqpTimeTrackOp> apply(ParsedOp op, LongFunction spaceInitF) { public OpDispenser<AmqpTimeTrackOp> apply(ParsedOp op, LongFunction spaceInitF) {
//public OpDispenser<AmqpTimeTrackOp> apply(ParsedOp op, LongFunction<AmqpTimeTrackOp> spaceInitF) { //public OpDispenser<AmqpTimeTrackOp> apply(ParsedOp op, LongFunction<AmqpTimeTrackOp> spaceInitF) {
int spaceName = op.getStaticConfigOr("space", 0); int spaceName = op.getStaticConfigOr("space", 0);
AmqpSpace amqpSpace = spaceCache.get(spaceName);
/* /*
* If the user provides a body element, then they want to provide the JSON or * If the user provides a body element, then they want to provide the JSON or
@ -65,9 +62,9 @@ public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp,AmqpSpace> {
return switch (opType.enumId) { return switch (opType.enumId) {
case AmqpMsgSender -> case AmqpMsgSender ->
new AmqpMsgSendOpDispenser(adapter, op, amqpSpace); new AmqpMsgSendOpDispenser(adapter, op);
case AmqpMsgReceiver -> case AmqpMsgReceiver ->
new AmqpMsgRecvOpDispenser(adapter, op, amqpSpace); new AmqpMsgRecvOpDispenser(adapter, op);
}; };
} }
} }

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.amqp.dispensers; package io.nosqlbench.adapter.amqp.dispensers;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import io.nosqlbench.adapter.amqp.AmqpDriverAdapter;
import io.nosqlbench.adapter.amqp.AmqpSpace; import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp; import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
@ -39,28 +40,21 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrack
protected final ParsedOp parsedOp; protected final ParsedOp parsedOp;
protected final AmqpAdapterMetrics amqpAdapterMetrics; protected final AmqpAdapterMetrics amqpAdapterMetrics;
protected final AmqpSpace amqpSpace;
protected final Map<String, String> amqpConfMap = new HashMap<>(); protected final Map<String, String> amqpConfMap = new HashMap<>();
protected final String exchangeType; protected String exchangeType;
protected AmqpBaseOpDispenser(final DriverAdapter adapter,
final ParsedOp op, private boolean configured= false;
final AmqpSpace amqpSpace) {
protected AmqpBaseOpDispenser(final AmqpDriverAdapter adapter,
final ParsedOp op) {
super(adapter, op); super(adapter, op);
parsedOp = op; parsedOp = op;
this.amqpSpace = amqpSpace;
amqpAdapterMetrics = new AmqpAdapterMetrics(this, this); amqpAdapterMetrics = new AmqpAdapterMetrics(this, this);
amqpAdapterMetrics.initS4JAdapterInstrumentation(); amqpAdapterMetrics.initS4JAdapterInstrumentation();
amqpConfMap.putAll(amqpSpace.getAmqpClientConf().getConfigMap());
this.exchangeType = amqpSpace.getAmqpExchangeType();
this.amqpSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
this.amqpSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
} }
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) { protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
@ -73,14 +67,15 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrack
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
LongFunction<String> stringLongFunction; LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class) stringLongFunction = parsedOp.getAsFunctionOr(paramName, defaultValue);
.orElse(l -> defaultValue);
logger.info("{}: {}", paramName, stringLongFunction.apply(0)); logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction; return stringLongFunction;
} }
protected void declareExchange(Channel channel, String exchangeName, String exchangeType) { protected void declareExchange(long cycleNum, Channel channel, String exchangeName, String exchangeType) {
configureDispenser(cycleNum);
try { try {
// Declaring the same exchange multiple times on one channel is considered as a no-op // Declaring the same exchange multiple times on one channel is considered as a no-op
channel.exchangeDeclare(exchangeName, exchangeType); channel.exchangeDeclare(exchangeName, exchangeType);
@ -96,26 +91,40 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrack
} }
protected long getConnSeqNum(long cycle) { protected long getConnSeqNum(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
configureDispenser(cycle);
return cycle % amqpSpace.getAmqpConnNum(); return cycle % amqpSpace.getAmqpConnNum();
} }
protected long getConnChannelSeqNum(long cycle) { protected long getConnChannelSeqNum(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
configureDispenser(cycle);
return (cycle / amqpSpace.getAmqpConnNum()) % amqpSpace.getAmqpConnChannelNum(); return (cycle / amqpSpace.getAmqpConnNum()) % amqpSpace.getAmqpConnChannelNum();
} }
protected long getChannelExchangeSeqNum(long cycle) { protected long getChannelExchangeSeqNum(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
configureDispenser(cycle);
return (cycle / ((long) amqpSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
amqpSpace.getAmqpConnChannelNum()) amqpSpace.getAmqpConnChannelNum())
) % amqpSpace.getAmqpChannelExchangeNum(); ) % amqpSpace.getAmqpChannelExchangeNum();
} }
protected String getEffectiveExchangeNameByCycle(long cycle) { protected String getEffectiveExchangeNameByCycle(long cycle) {
configureDispenser(cycle);
return getEffectiveExchangeName( return getEffectiveExchangeName(
cycle,
getConnSeqNum(cycle), getConnSeqNum(cycle),
getConnChannelSeqNum(cycle), getConnChannelSeqNum(cycle),
getChannelExchangeSeqNum(cycle)); getChannelExchangeSeqNum(cycle));
} }
protected String getEffectiveExchangeName(long connSeqNum, long channelSeqNum, long exchangeSeqNum) { protected String getEffectiveExchangeName(long cycleNum, long connSeqNum, long channelSeqNum,
long exchangeSeqNum) {
configureDispenser(cycleNum);
return String.format( return String.format(
"exchange-%d-%d-%d", "exchange-%d-%d-%d",
connSeqNum, connSeqNum,
@ -126,4 +135,17 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrack
public String getName() { public String getName() {
return "AmqpBaseOpDispenser"; return "AmqpBaseOpDispenser";
} }
synchronized void configureDispenser(long cycle) {
if (!configured) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
amqpConfMap.putAll(amqpSpace.getAmqpClientConf().getConfigMap());
this.exchangeType = amqpSpace.getAmqpExchangeType();
amqpSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
amqpSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
}
configured=true;
}
} }

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.amqp.dispensers;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.amqp.AmqpDriverAdapter;
import io.nosqlbench.adapter.amqp.AmqpSpace; import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.ops.OpTimeTrackAmqpMsgRecvOp; import io.nosqlbench.adapter.amqp.ops.OpTimeTrackAmqpMsgRecvOp;
@ -35,14 +36,14 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
private final static Logger logger = LogManager.getLogger(AmqpMsgRecvOpDispenser.class); private final static Logger logger = LogManager.getLogger(AmqpMsgRecvOpDispenser.class);
private final LongFunction<String> bindingKeyFunc; private final LongFunction<String> bindingKeyFunc;
public AmqpMsgRecvOpDispenser(DriverAdapter adapter, public AmqpMsgRecvOpDispenser(AmqpDriverAdapter adapter,
ParsedOp op, ParsedOp op) {
AmqpSpace amqpSpace) { super(adapter, op);
super(adapter, op, amqpSpace);
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
} }
private long getExchangeQueueSeqNum(long cycle) { private long getExchangeQueueSeqNum(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
return (cycle / ((long) amqpSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
amqpSpace.getAmqpConnChannelNum() * amqpSpace.getAmqpConnChannelNum() *
amqpSpace.getAmqpChannelExchangeNum()) amqpSpace.getAmqpChannelExchangeNum())
@ -50,6 +51,8 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
} }
private long getQueueReceiverSeqNum(long cycle) { private long getQueueReceiverSeqNum(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
return (cycle / ((long) amqpSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
amqpSpace.getAmqpConnChannelNum() * amqpSpace.getAmqpConnChannelNum() *
amqpSpace.getAmqpChannelExchangeNum() * amqpSpace.getAmqpChannelExchangeNum() *
@ -91,6 +94,8 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
} }
private Channel getAmqpChannelForReceiver(long cycle) { private Channel getAmqpChannelForReceiver(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
long connSeqNum = getConnSeqNum(cycle); long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle);
@ -121,6 +126,8 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
@Override @Override
public AmqpTimeTrackOp getOp(long cycle) { public AmqpTimeTrackOp getOp(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
Channel channel = getAmqpChannelForReceiver(cycle); Channel channel = getAmqpChannelForReceiver(cycle);
if (channel == null) { if (channel == null) {
throw new AmqpAdapterUnexpectedException( throw new AmqpAdapterUnexpectedException(
@ -131,7 +138,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
} }
String exchangeName = getEffectiveExchangeNameByCycle(cycle); String exchangeName = getEffectiveExchangeNameByCycle(cycle);
declareExchange(channel, exchangeName, amqpSpace.getAmqpExchangeType()); declareExchange(cycle, channel, exchangeName, amqpSpace.getAmqpExchangeType());
boolean durable = true; boolean durable = true;
boolean exclusive = true; boolean exclusive = true;

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.amqp.dispensers;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.amqp.AmqpDriverAdapter;
import io.nosqlbench.adapter.amqp.AmqpSpace; import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException; import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
@ -52,11 +53,9 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
private final LongFunction<String> routingKeyFunc; private final LongFunction<String> routingKeyFunc;
private final LongFunction<String> msgPayloadFunc; private final LongFunction<String> msgPayloadFunc;
public AmqpMsgSendOpDispenser(DriverAdapter adapter, public AmqpMsgSendOpDispenser(AmqpDriverAdapter adapter,
ParsedOp op, ParsedOp op) {
AmqpSpace amqpSpace) { super(adapter, op);
super(adapter, op, amqpSpace);
publisherConfirm = parsedOp publisherConfirm = parsedOp
.getOptionalStaticConfig("publisher_confirm", String.class) .getOptionalStaticConfig("publisher_confirm", String.class)
.filter(Predicate.not(String::isEmpty)) .filter(Predicate.not(String::isEmpty))
@ -87,6 +86,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
} }
private long getExchangeSenderSeqNum(long cycle) { private long getExchangeSenderSeqNum(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
return (cycle / ((long) amqpSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
amqpSpace.getAmqpConnChannelNum() * amqpSpace.getAmqpConnChannelNum() *
amqpSpace.getAmqpChannelExchangeNum()) amqpSpace.getAmqpChannelExchangeNum())
@ -113,6 +113,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
} }
private Channel getAmqpChannelForSender(long cycle) { private Channel getAmqpChannelForSender(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
long connSeqNum = getConnSeqNum(cycle); long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle);
@ -176,6 +177,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
@Override @Override
public AmqpTimeTrackOp getOp(long cycle) { public AmqpTimeTrackOp getOp(long cycle) {
AmqpSpace amqpSpace = spaceF.apply(cycle);
String msgPayload = msgPayloadFunc.apply(cycle); String msgPayload = msgPayloadFunc.apply(cycle);
if (StringUtils.isBlank(msgPayload)) { if (StringUtils.isBlank(msgPayload)) {
throw new AmqpAdapterInvalidParamException("Message payload must be specified and can't be empty!"); throw new AmqpAdapterInvalidParamException("Message payload must be specified and can't be empty!");
@ -191,7 +193,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
} }
String exchangeName = getEffectiveExchangeNameByCycle(cycle); String exchangeName = getEffectiveExchangeNameByCycle(cycle);
declareExchange(channel, exchangeName, amqpSpace.getAmqpExchangeType()); declareExchange(cycle, channel, exchangeName, amqpSpace.getAmqpExchangeType());
return new OpTimeTrackAmqpMsgSendOp( return new OpTimeTrackAmqpMsgSendOp(
amqpAdapterMetrics, amqpAdapterMetrics,

View File

@ -58,7 +58,7 @@ public abstract class Cqld4BaseOpDispenser<T extends Cqld4BaseOp<?>> extends Bas
public Cqld4BaseOpDispenser(Cqld4DriverAdapter adapter, public Cqld4BaseOpDispenser(Cqld4DriverAdapter adapter,
ParsedOp op) { ParsedOp op) {
super((DriverAdapter<? extends T, ? extends Cqld4Space>) adapter, op); super((DriverAdapter<? extends T, ? extends Cqld4Space>) adapter, op);
this.sessionF = l -> adapter.getSpaceCache().get(l).getSession(); this.sessionF = l -> adapter.getSpaceFunc(op).apply(l).getSession();
this.maxpages = op.getStaticConfigOr("maxpages", 1); this.maxpages = op.getStaticConfigOr("maxpages", 1);
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false); this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1); this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1);

View File

@ -76,7 +76,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
(long l) -> (sessionF.apply(l)).prepare(preparedQueryString); (long l) -> (sessionF.apply(l)).prepare(preparedQueryString);
LongFunction<? extends Cqld4Space> lookupSpaceF = LongFunction<? extends Cqld4Space> lookupSpaceF =
(long l) -> adapter.getSpaceCache().get(l); (long l) -> adapter.getSpaceFunc(op).apply(l);
int refKey = op.getRefKey(); int refKey = op.getRefKey();
LongFunction<PreparedStatement> cachedStatementF = LongFunction<PreparedStatement> cachedStatementF =

View File

@ -33,13 +33,9 @@ public abstract class Cqld4BaseOpMapper<T extends Cqld4BaseOp<?>> implements OpM
protected final static Logger logger = LogManager.getLogger(Cqld4BaseOpMapper.class); protected final static Logger logger = LogManager.getLogger(Cqld4BaseOpMapper.class);
protected final Cqld4DriverAdapter adapter; protected final Cqld4DriverAdapter adapter;
protected final LongFunction<Cqld4Space> spaceFunc;
protected final LongFunction<CqlSession> sessionFunc;
public Cqld4BaseOpMapper(Cqld4DriverAdapter adapter) { public Cqld4BaseOpMapper(Cqld4DriverAdapter adapter) {
this.adapter = adapter; this.adapter = adapter;
spaceFunc = l -> adapter.getSpaceCache().get(l);
sessionFunc = l -> spaceFunc.apply(l).getSession();
} }
@Override @Override

View File

@ -51,17 +51,17 @@ public class Cqld4CoreOpMapper extends Cqld4BaseOpMapper<Cqld4BaseOp<?>> {
*/ */
@Override @Override
public OpDispenser<Cqld4BaseOp<?>> apply(ParsedOp op, LongFunction<Cqld4Space> cqld4SpaceLongFunction) { public OpDispenser<Cqld4BaseOp<?>> apply(ParsedOp op, LongFunction<Cqld4Space> spaceF) {
CqlD4OpType opType = CqlD4OpType.prepared; CqlD4OpType opType = CqlD4OpType.prepared;
TypeAndTarget<CqlD4OpType, String> target = op.getTypeAndTarget(CqlD4OpType.class, String.class, "type", "stmt"); TypeAndTarget<CqlD4OpType, String> target = op.getTypeAndTarget(CqlD4OpType.class, String.class, "type", "stmt");
logger.info(() -> "Using " + target.enumId + " statement form for '" + op.getName() + "'"); logger.info(() -> "Using " + target.enumId + " statement form for '" + op.getName() + "'");
return (OpDispenser<Cqld4BaseOp<?>>) switch (target.enumId) { return (OpDispenser<Cqld4BaseOp<?>>) switch (target.enumId) {
case raw, simple, prepared, batch -> new Cqld4CqlOpMapper(adapter).apply(op, spaceFunc); case raw, simple, prepared, batch -> new Cqld4CqlOpMapper(adapter).apply(op, spaceF);
case gremlin -> new Cqld4GremlinOpMapper(adapter, target.targetFunction).apply(op, spaceFunc); case gremlin -> new Cqld4GremlinOpMapper(adapter, target.targetFunction).apply(op, spaceF);
case fluent -> new Cqld4FluentGraphOpMapper(adapter, target).apply(op, spaceFunc); case fluent -> new Cqld4FluentGraphOpMapper(adapter, target).apply(op, spaceF);
case rainbow -> case rainbow ->
new CqlD4RainbowTableMapper(adapter, sessionFunc, target.targetFunction).apply(op, spaceFunc); new CqlD4RainbowTableMapper(adapter, spaceF, target.targetFunction).apply(op, spaceF);
default -> throw new OpConfigError("Unsupported op type " + opType); default -> throw new OpConfigError("Unsupported op type " + opType);
// case sst -> new Cqld4SsTableMapper(adapter, sessionFunc, target.targetFunction).apply(op); // case sst -> new Cqld4SsTableMapper(adapter, sessionFunc, target.targetFunction).apply(op);
}; };

View File

@ -46,13 +46,13 @@ public class Cqld4CqlOpMapper extends Cqld4CqlBaseOpMapper<Cqld4CqlOp> {
return (OpDispenser<Cqld4CqlOp>) switch (target.enumId) { return (OpDispenser<Cqld4CqlOp>) switch (target.enumId) {
case raw -> { case raw -> {
CqlD4RawStmtMapper cqlD4RawStmtMapper = new CqlD4RawStmtMapper(adapter, target.targetFunction); CqlD4RawStmtMapper cqlD4RawStmtMapper = new CqlD4RawStmtMapper(adapter, target.targetFunction);
OpDispenser<Cqld4CqlSimpleStatement> apply = cqlD4RawStmtMapper.apply(op, spaceFunc); OpDispenser<Cqld4CqlSimpleStatement> apply = cqlD4RawStmtMapper.apply(op, spaceInitF);
yield apply; yield apply;
} }
case simple -> new CqlD4CqlSimpleStmtMapper(adapter, target.targetFunction).apply(op, spaceFunc); case simple -> new CqlD4CqlSimpleStmtMapper(adapter, target.targetFunction).apply(op, spaceInitF);
case prepared -> new CqlD4PreparedStmtMapper(adapter, target).apply(op, spaceFunc); case prepared -> new CqlD4PreparedStmtMapper(adapter, target).apply(op, spaceInitF);
case batch -> new CqlD4BatchStmtMapper(adapter, target).apply(op, spaceFunc); case batch -> new CqlD4BatchStmtMapper(adapter, target).apply(op, spaceInitF);
default -> default ->
throw new OpConfigError("Unsupported op type for CQL category of statement forms:" + target.enumId); throw new OpConfigError("Unsupported op type for CQL category of statement forms:" + target.enumId);
}; };

View File

@ -40,7 +40,9 @@ public class Cqld4GremlinOpMapper<CO extends Cqld4ScriptGraphOp> extends Cqld4Ba
@Override @Override
public Cqld4GremlinOpDispenser apply(ParsedOp op, LongFunction spaceInitF) { public Cqld4GremlinOpDispenser apply(ParsedOp op, LongFunction spaceInitF) {
return new Cqld4GremlinOpDispenser(adapter, sessionFunc, targetFunction, op); return new Cqld4GremlinOpDispenser(
adapter,
l -> adapter.getSpaceFunc(op).apply(l).getSession(), targetFunction, op);
} }
} }

View File

@ -42,7 +42,7 @@ public class DynamoDBDriverAdapter extends BaseDriverAdapter<DynamoDBOp, DynamoD
@Override @Override
public OpMapper<DynamoDBOp,DynamoDBSpace> getOpMapper() { public OpMapper<DynamoDBOp,DynamoDBSpace> getOpMapper() {
NBConfiguration adapterConfig = getConfiguration(); NBConfiguration adapterConfig = getConfiguration();
return new DynamoDBOpMapper(this, adapterConfig, getSpaceCache()); return new DynamoDBOpMapper(this, adapterConfig);
} }
@Override @Override

View File

@ -33,19 +33,18 @@ import java.util.function.LongFunction;
public class DynamoDBOpMapper implements OpMapper<DynamoDBOp,DynamoDBSpace> { public class DynamoDBOpMapper implements OpMapper<DynamoDBOp,DynamoDBSpace> {
private final NBConfiguration cfg; private final NBConfiguration cfg;
private final ConcurrentSpaceCache<DynamoDBSpace> cache;
private final DriverAdapter adapter; private final DriverAdapter adapter;
public DynamoDBOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<DynamoDBSpace> cache) { public DynamoDBOpMapper(DynamoDBDriverAdapter adapter, NBConfiguration cfg) {
this.cfg = cfg; this.cfg = cfg;
this.cache = cache;
this.adapter = adapter; this.adapter = adapter;
} }
@Override @Override
public OpDispenser<DynamoDBOp> apply(ParsedOp op, LongFunction<DynamoDBSpace> spaceInitF) { public OpDispenser<DynamoDBOp> apply(ParsedOp op, LongFunction<DynamoDBSpace> spaceInitF) {
int space = op.getStaticConfigOr("space", 0); int space = op.getStaticConfigOr("space", 0);
DynamoDB ddb = cache.get(space).getDynamoDB(); LongFunction<DynamoDBSpace> spaceFunc = adapter.getSpaceFunc(op);
DynamoDB ddb = spaceFunc.apply(space).getDynamoDB();
/* /*
* If the user provides a body element, then they want to provide the JSON or * If the user provides a body element, then they want to provide the JSON or

View File

@ -56,7 +56,7 @@ public class HttpDriverAdapter extends BaseDriverAdapter<HttpOp, HttpSpace> {
@Override @Override
public OpMapper<HttpOp,HttpSpace> getOpMapper() { public OpMapper<HttpOp,HttpSpace> getOpMapper() {
NBConfiguration config = getConfiguration(); NBConfiguration config = getConfiguration();
return new HttpOpMapper(this, config, getSpaceCache()); return new HttpOpMapper(this, config);
} }
@Override @Override

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.http.core; package io.nosqlbench.adapter.http.core;
import io.nosqlbench.adapter.http.HttpDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser; import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper; import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache; import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentSpaceCache;
@ -29,19 +30,16 @@ import java.util.function.LongFunction;
public class HttpOpMapper implements OpMapper<HttpOp,HttpSpace> { public class HttpOpMapper implements OpMapper<HttpOp,HttpSpace> {
private final NBConfiguration cfg; private final NBConfiguration cfg;
private final ConcurrentSpaceCache<? extends HttpSpace> spaceCache;
private final DriverAdapter adapter; private final DriverAdapter adapter;
public HttpOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<HttpSpace> spaceCache) { public HttpOpMapper(HttpDriverAdapter adapter, NBConfiguration cfg) {
this.cfg = cfg; this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter; this.adapter = adapter;
} }
@Override @Override
public OpDispenser<HttpOp> apply(ParsedOp op, LongFunction<HttpSpace> spaceInitF) { public OpDispenser<HttpOp> apply(ParsedOp op, LongFunction<HttpSpace> spaceInitF) {
LongFunction<String> spaceNameF = op.getAsFunctionOr("space", "default"); LongFunction<String> spaceNameF = op.getAsFunctionOr("space", "default");
LongFunction<HttpSpace> spaceFunc = l -> spaceCache.get(l); return new HttpOpDispenser(adapter, spaceInitF, op);
return new HttpOpDispenser(adapter, spaceFunc, op);
} }
} }

View File

@ -49,8 +49,7 @@ public class HttpOpMapperTest {
HttpOpMapperTest.cfg = HttpSpace.getConfigModel().apply(Map.of()); HttpOpMapperTest.cfg = HttpSpace.getConfigModel().apply(Map.of());
HttpOpMapperTest.adapter = new HttpDriverAdapter(new TestComponent("parent","parent"), NBLabels.forKV()); HttpOpMapperTest.adapter = new HttpDriverAdapter(new TestComponent("parent","parent"), NBLabels.forKV());
HttpOpMapperTest.adapter.applyConfig(HttpOpMapperTest.cfg); HttpOpMapperTest.adapter.applyConfig(HttpOpMapperTest.cfg);
ConcurrentSpaceCache<HttpSpace> spaceCache = HttpOpMapperTest.adapter.getSpaceCache(); HttpOpMapperTest.mapper = new HttpOpMapper(HttpOpMapperTest.adapter, HttpOpMapperTest.cfg);
HttpOpMapperTest.mapper = new HttpOpMapper(HttpOpMapperTest.adapter, HttpOpMapperTest.cfg, spaceCache);
} }
private static ParsedOp parsedOpFor(final String yaml) { private static ParsedOp parsedOpFor(final String yaml) {

View File

@ -39,9 +39,8 @@ public class KafkaDriverAdapter extends BaseDriverAdapter<KafkaOp, KafkaSpace> {
@Override @Override
public OpMapper<KafkaOp,KafkaSpace> getOpMapper() { public OpMapper<KafkaOp,KafkaSpace> getOpMapper() {
ConcurrentSpaceCache<KafkaSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration(); NBConfiguration adapterConfig = getConfiguration();
return new KafkaOpMapper(this, adapterConfig, spaceCache); return new KafkaOpMapper(this, adapterConfig);
} }

View File

@ -36,11 +36,9 @@ public class KafkaOpMapper implements OpMapper<KafkaOp,KafkaSpace> {
private final static Logger logger = LogManager.getLogger(KafkaOpMapper.class); private final static Logger logger = LogManager.getLogger(KafkaOpMapper.class);
private final ConcurrentSpaceCache<KafkaSpace> spaceCache;
private final DriverAdapter<KafkaOp,KafkaSpace> adapter; private final DriverAdapter<KafkaOp,KafkaSpace> adapter;
public KafkaOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<KafkaSpace> spaceCache) { public KafkaOpMapper(DriverAdapter adapter, NBConfiguration cfg) {
this.spaceCache = spaceCache;
this.adapter = adapter; this.adapter = adapter;
} }

View File

@ -36,12 +36,9 @@ public class MongoOpMapper<MC extends MongoDirectCommandOp> implements OpMapper<
private final MongodbDriverAdapter adapter; private final MongodbDriverAdapter adapter;
private final NBConfiguration configuration; private final NBConfiguration configuration;
private final ConcurrentSpaceCache<MongoSpace> spaceCache;
public MongoOpMapper(MongodbDriverAdapter adapter, NBConfiguration cfg, public MongoOpMapper(MongodbDriverAdapter adapter, NBConfiguration cfg) {
ConcurrentSpaceCache<MongoSpace> spaceCache) {
this.configuration = cfg; this.configuration = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter; this.adapter = adapter;
} }

View File

@ -41,7 +41,7 @@ public class MongodbDriverAdapter extends BaseDriverAdapter<MongoOp<?>, MongoSpa
@Override @Override
public OpMapper<MongoOp<?>,MongoSpace> getOpMapper() { public OpMapper<MongoOp<?>,MongoSpace> getOpMapper() {
return new MongoOpMapper(this, getConfiguration(), getSpaceCache()); return new MongoOpMapper(this, getConfiguration());
} }
@Override @Override

View File

@ -38,7 +38,7 @@ public class Neo4JDriverAdapter extends BaseDriverAdapter<Neo4JBaseOp, Neo4JSpac
@Override @Override
public OpMapper<Neo4JBaseOp,Neo4JSpace> getOpMapper() { public OpMapper<Neo4JBaseOp,Neo4JSpace> getOpMapper() {
return new Neo4JOpMapper(this, getSpaceCache()); return new Neo4JOpMapper(this);
} }
@Override @Override

View File

@ -33,7 +33,7 @@ import java.util.function.LongFunction;
public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp,Neo4JSpace> { public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp,Neo4JSpace> {
private final Neo4JDriverAdapter adapter; private final Neo4JDriverAdapter adapter;
public Neo4JOpMapper(Neo4JDriverAdapter adapter, ConcurrentSpaceCache<Neo4JSpace> cache) { public Neo4JOpMapper(Neo4JDriverAdapter adapter) {
this.adapter = adapter; this.adapter = adapter;
} }

View File

@ -44,9 +44,8 @@ public class PulsarDriverAdapter extends BaseDriverAdapter<PulsarOp, PulsarSpace
@Override @Override
public OpMapper<PulsarOp,PulsarSpace> getOpMapper() { public OpMapper<PulsarOp,PulsarSpace> getOpMapper() {
ConcurrentSpaceCache<PulsarSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration(); NBConfiguration adapterConfig = getConfiguration();
return new PulsarOpMapper(this, adapterConfig, spaceCache); return new PulsarOpMapper(this, adapterConfig);
} }
@Override @Override

View File

@ -37,20 +37,18 @@ public class PulsarOpMapper implements OpMapper<PulsarOp,PulsarSpace> {
private final static Logger logger = LogManager.getLogger(PulsarOpMapper.class); private final static Logger logger = LogManager.getLogger(PulsarOpMapper.class);
private final NBConfiguration cfg; private final NBConfiguration cfg;
private final ConcurrentSpaceCache<PulsarSpace> spaceCache;
private final PulsarDriverAdapter adapter; private final PulsarDriverAdapter adapter;
public PulsarOpMapper(PulsarDriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<PulsarSpace> spaceCache) { public PulsarOpMapper(PulsarDriverAdapter adapter, NBConfiguration cfg) {
this.cfg = cfg; this.cfg = cfg;
this.spaceCache = spaceCache; this.adapter = adapter;
this.adapter = adapter;
} }
@Override @Override
public OpDispenser<PulsarOp> apply(ParsedOp op, LongFunction<PulsarSpace> spaceInitF) { public OpDispenser<PulsarOp> apply(ParsedOp op, LongFunction<PulsarSpace> spaceInitF) {
int spaceName = op.getStaticConfigOr("space", 0); int spaceName = op.getStaticConfigOr("space", 0);
// PulsarSpace pulsarSpace = spaceCache.get(spaceName); // PulsarSpace pulsarSpace = spaceCache.get(spaceName);
PulsarSpace pulsarSpace = adapter.getSpaceCache().get(spaceName); PulsarSpace pulsarSpace = adapter.getSpaceFunc(op).apply(spaceName);
/* /*
* If the user provides a body element, then they want to provide the JSON or * If the user provides a body element, then they want to provide the JSON or

View File

@ -42,9 +42,7 @@ public class S4JDriverAdapter extends BaseDriverAdapter<S4JOp, S4JSpace> {
@Override @Override
public OpMapper<S4JOp,S4JSpace> getOpMapper() { public OpMapper<S4JOp,S4JSpace> getOpMapper() {
ConcurrentSpaceCache<? extends S4JSpace> spaceCache = getSpaceCache(); return new S4JOpMapper(this);
NBConfiguration adapterConfig = getConfiguration();
return new S4JOpMapper(this, adapterConfig, spaceCache);
} }
@Override @Override

View File

@ -37,20 +37,14 @@ public class S4JOpMapper implements OpMapper<S4JOp,S4JSpace> {
private final static Logger logger = LogManager.getLogger(S4JOpMapper.class); private final static Logger logger = LogManager.getLogger(S4JOpMapper.class);
private final NBConfiguration cfg; private final S4JDriverAdapter adapter;
private final ConcurrentSpaceCache<? extends S4JSpace> spaceCache;
private final DriverAdapter adapter;
public S4JOpMapper(DriverAdapter adapter, NBConfiguration cfg, ConcurrentSpaceCache<? extends S4JSpace> spaceCache) { public S4JOpMapper(S4JDriverAdapter adapter) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter; this.adapter = adapter;
} }
@Override @Override
public OpDispenser<S4JOp> apply(ParsedOp op, LongFunction<S4JSpace> spaceInitF) { public OpDispenser<S4JOp> apply(ParsedOp op, LongFunction<S4JSpace> spaceInitF) {
int spaceIdx = op.getStaticConfigOr("space", 0);
S4JSpace s4jSpace = spaceCache.get(spaceIdx);
/* /*
* If the user provides a body element, then they want to provide the JSON or * If the user provides a body element, then they want to provide the JSON or
@ -65,9 +59,9 @@ public class S4JOpMapper implements OpMapper<S4JOp,S4JSpace> {
return switch (opType.enumId) { return switch (opType.enumId) {
case MessageProduce -> case MessageProduce ->
new MessageProducerOpDispenser(adapter, op, opType.targetFunction, s4jSpace); new MessageProducerOpDispenser(adapter, op, opType.targetFunction);
case MessageConsume -> case MessageConsume ->
new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, s4jSpace); new MessageConsumerOpDispenser(adapter, op, opType.targetFunction);
}; };
} }
} }

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.s4j.dispensers; package io.nosqlbench.adapter.s4j.dispensers;
import io.nosqlbench.adapter.s4j.S4JDriverAdapter;
import io.nosqlbench.adapter.s4j.S4JSpace; import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.MessageConsumerOp; import io.nosqlbench.adapter.s4j.ops.MessageConsumerOp;
import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil; import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil;
@ -64,13 +65,14 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
// Setting them here will allow scenario-specific customer configurations. At the moment, only the // Setting them here will allow scenario-specific customer configurations. At the moment, only the
// DLT related settings are supported // DLT related settings are supported
private final Map<String, Object> combinedS4jConfigObjMap = new HashMap<>(); private final Map<String, Object> combinedS4jConfigObjMap = new HashMap<>();
private final HashMap<String, String> stmtLvlConsumerConfRawMap;
private boolean configured = false;
public MessageConsumerOpDispenser(DriverAdapter adapter, public MessageConsumerOpDispenser(S4JDriverAdapter adapter,
ParsedOp op, ParsedOp op,
LongFunction<String> tgtNameFunc, LongFunction<String> tgtNameFunc) {
S4JSpace s4jSpace) { super(adapter, op, tgtNameFunc);
super(adapter, op, tgtNameFunc, s4jSpace);
this.blockingMsgRecv = this.blockingMsgRecv =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.BLOCKING_MSG_RECV.label, Boolean.FALSE); parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.BLOCKING_MSG_RECV.label, Boolean.FALSE);
@ -109,6 +111,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
"consumer.deadLetterPolicy", "consumer.deadLetterPolicy",
"consumer.negativeAckRedeliveryBackoff", "consumer.negativeAckRedeliveryBackoff",
"consumer.ackTimeoutRedeliveryBackoff"}; "consumer.ackTimeoutRedeliveryBackoff"};
HashMap<String, String> stmtLvlConsumerConfRawMap = new HashMap<>(); HashMap<String, String> stmtLvlConsumerConfRawMap = new HashMap<>();
for (String confKey : stmtLvlConsumerConfKeyNameList ) { for (String confKey : stmtLvlConsumerConfKeyNameList ) {
String confVal = parsedOp.getStaticConfigOr(confKey, ""); String confVal = parsedOp.getStaticConfigOr(confKey, "");
@ -116,13 +119,17 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
StringUtils.substringAfter(confKey, "consumer."), StringUtils.substringAfter(confKey, "consumer."),
confVal); confVal);
} }
this.stmtLvlConsumerConfRawMap = stmtLvlConsumerConfRawMap;
this.combinedS4jConfigObjMap.putAll(
s4jSpace.getS4JClientConf().mergeExtraConsumerConfig(stmtLvlConsumerConfRawMap));
} }
@Override @Override
public MessageConsumerOp getOp(long cycle) { public MessageConsumerOp getOp(long cycle) {
S4JSpace s4jSpace = s4jSpaceF.apply(cycle);
if (!configured) {
s4jSpace.getS4JClientConf().mergeExtraConsumerConfig(stmtLvlConsumerConfRawMap);
}
configured=true;
S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap); S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransact = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle); boolean commitTransact = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
@ -130,7 +137,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
Destination destination; Destination destination;
try { try {
destination = getJmsDestination( destination = getJmsDestination(
s4JJMSContextWrapper, temporaryDest, destType, destNameStrFunc.apply(cycle)); cycle, s4JJMSContextWrapper, temporaryDest, destType, destNameStrFunc.apply(cycle));
} }
catch (JMSRuntimeException jmsRuntimeException) { catch (JMSRuntimeException jmsRuntimeException) {
throw new RuntimeException("Unable to create the JMS destination!"); throw new RuntimeException("Unable to create the JMS destination!");
@ -139,6 +146,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
JMSConsumer jmsConsumer; JMSConsumer jmsConsumer;
try { try {
jmsConsumer = getJmsConsumer( jmsConsumer = getJmsConsumer(
cycle,
s4JJMSContextWrapper, s4JJMSContextWrapper,
destination, destination,
destType, destType,
@ -157,7 +165,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
return new MessageConsumerOp( return new MessageConsumerOp(
s4jAdapterMetrics, s4jAdapterMetrics,
s4jSpace, this.s4jSpaceF.apply(cycle),
jmsContext, jmsContext,
destination, destination,
asyncAPI, asyncAPI,

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.s4j.dispensers; package io.nosqlbench.adapter.s4j.dispensers;
import io.nosqlbench.adapter.s4j.S4JDriverAdapter;
import io.nosqlbench.adapter.s4j.S4JSpace; import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException; import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException; import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
@ -52,11 +53,10 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
private final LongFunction<String> msgBodyRawStrFunc; private final LongFunction<String> msgBodyRawStrFunc;
private final LongFunction<String> msgTypeFunc; private final LongFunction<String> msgTypeFunc;
public MessageProducerOpDispenser(DriverAdapter adapter, public MessageProducerOpDispenser(S4JDriverAdapter adapter,
ParsedOp op, ParsedOp op,
LongFunction<String> tgtNameFunc, LongFunction<String> tgtNameFunc) {
S4JSpace s4jSpace) { super(adapter, op, tgtNameFunc);
super(adapter, op, tgtNameFunc, s4jSpace);
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM); this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM);
@ -130,7 +130,8 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
return message; return message;
} }
private Message updateMessageHeaders(S4JJMSContextWrapper s4JJMSContextWrapper, Message message, String msgType, String msgHeaderRawJsonStr) throws JMSException { private Message updateMessageHeaders(long curCycle, S4JJMSContextWrapper s4JJMSContextWrapper, Message message,
String msgType, String msgHeaderRawJsonStr) throws JMSException {
int messageSize = Integer.parseInt(message.getStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP)); int messageSize = Integer.parseInt(message.getStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP));
// Check if msgHeaderRawJsonStr is a valid JSON string with a collection of key/value pairs // Check if msgHeaderRawJsonStr is a valid JSON string with a collection of key/value pairs
@ -176,7 +177,8 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
if (value != null) { if (value != null) {
String destType = StringUtils.substringBefore(value, ':'); String destType = StringUtils.substringBefore(value, ':');
String destName = StringUtils.substringAfter(value, ':'); String destName = StringUtils.substringAfter(value, ':');
outMessage.setJMSReplyTo(getJmsDestination(s4JJMSContextWrapper,false, destType, destName)); outMessage.setJMSReplyTo(getJmsDestination(curCycle,s4JJMSContextWrapper,false, destType,
destName));
} }
} }
// Ignore these headers - handled by S4J API automatically // Ignore these headers - handled by S4J API automatically
@ -274,6 +276,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
@Override @Override
public MessageProducerOp getOp(long cycle) { public MessageProducerOp getOp(long cycle) {
S4JSpace s4jSpace = s4jSpaceF.apply(cycle);
String destName = destNameStrFunc.apply(cycle); String destName = destNameStrFunc.apply(cycle);
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle); String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle); String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle);
@ -299,7 +302,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
Destination destination; Destination destination;
try { try {
destination = getJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName); destination = getJmsDestination(cycle, s4JJMSContextWrapper, temporaryDest, destType, destName);
} }
catch (JMSRuntimeException jmsRuntimeException) { catch (JMSRuntimeException jmsRuntimeException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS destination!"); throw new S4JAdapterUnexpectedException("Unable to create the JMS destination!");
@ -307,7 +310,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
JMSProducer producer; JMSProducer producer;
try { try {
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI); producer = getJmsProducer(cycle, s4JJMSContextWrapper, asyncAPI);
int priority = NumberUtils.toInt(jmsMsgPriorityStr); int priority = NumberUtils.toInt(jmsMsgPriorityStr);
assert (priority >= 0 && priority <= 9); assert (priority >= 0 && priority <= 9);
producer.setPriority(priority); producer.setPriority(priority);
@ -343,7 +346,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
// ----------------------- // -----------------------
// //
try { try {
message = updateMessageHeaders(s4JJMSContextWrapper, message, jmsMsgType, jmsMsgHeaderRawJsonStr); message = updateMessageHeaders(cycle, s4JJMSContextWrapper, message, jmsMsgType, jmsMsgHeaderRawJsonStr);
} }
catch (JMSException jmsException) { catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Failed to set create a JMS message and set its payload!"); throw new S4JAdapterUnexpectedException("Failed to set create a JMS message and set its payload!");

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.s4j.dispensers; package io.nosqlbench.adapter.s4j.dispensers;
import com.datastax.oss.pulsar.jms.PulsarJMSContext; import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.S4JDriverAdapter;
import io.nosqlbench.adapter.s4j.S4JSpace; import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.S4JOp; import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapter.s4j.util.*; import io.nosqlbench.adapter.s4j.util.*;
@ -40,7 +41,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
private static final Logger logger = LogManager.getLogger("PulsarBaseOpDispenser"); private static final Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
protected final ParsedOp parsedOp; protected final ParsedOp parsedOp;
protected final S4JSpace s4jSpace;
protected final S4JAdapterMetrics s4jAdapterMetrics; protected final S4JAdapterMetrics s4jAdapterMetrics;
// Doc-level parameter: temporary_dest (default: false) // Doc-level parameter: temporary_dest (default: false)
@ -57,16 +57,15 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
protected final int totalThreadNum; protected final int totalThreadNum;
protected final long totalCycleNum; protected final long totalCycleNum;
protected final LongFunction<S4JSpace> s4jSpaceF;
protected S4JBaseOpDispenser(DriverAdapter adapter, protected S4JBaseOpDispenser(S4JDriverAdapter adapter,
ParsedOp op, ParsedOp op,
LongFunction<String> destNameStrFunc, LongFunction<String> destNameStrFunc) {
S4JSpace s4jSpace) {
super(adapter, op); super(adapter, op);
this.parsedOp = op; this.parsedOp = op;
this.s4jSpace = s4jSpace;
this.s4jAdapterMetrics = new S4JAdapterMetrics(this); this.s4jAdapterMetrics = new S4JAdapterMetrics(this);
s4jAdapterMetrics.initS4JAdapterInstrumentation(); s4jAdapterMetrics.initS4JAdapterInstrumentation();
@ -83,10 +82,9 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class)); this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class)); this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));
s4jSpace.setTotalCycleNum(totalCycleNum); this.s4jSpaceF = adapter.getSpaceFunc(op);
} }
public S4JSpace getS4jSpace() { return s4jSpace; }
public S4JAdapterMetrics getS4jAdapterMetrics() { return s4jAdapterMetrics; } public S4JAdapterMetrics getS4jAdapterMetrics() { return s4jAdapterMetrics; }
protected LongFunction<Boolean> lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) { protected LongFunction<Boolean> lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) {
@ -179,6 +177,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
long curCycle, long curCycle,
Map<String, Object> overrideS4jConfMap) Map<String, Object> overrideS4jConfMap)
{ {
S4JSpace s4jSpace = s4jSpaceF.apply(curCycle);
int totalConnNum = s4jSpace.getMaxNumConn(); int totalConnNum = s4jSpace.getMaxNumConn();
int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn(); int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn();
@ -220,11 +219,14 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/ */
public Destination getJmsDestination( public Destination getJmsDestination(
long curCycle,
S4JJMSContextWrapper s4JJMSContextWrapper, S4JJMSContextWrapper s4JJMSContextWrapper,
boolean tempDest, boolean tempDest,
String destType, String destType,
String destName) throws JMSRuntimeException String destName
) throws JMSRuntimeException
{ {
S4JSpace s4jSpace = s4jSpaceF.apply(curCycle);
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifier(); String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifier();
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
@ -269,9 +271,11 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
* If the JMS producer that corresponds to a destination exists, reuse it; Otherwise, create it * If the JMS producer that corresponds to a destination exists, reuse it; Otherwise, create it
*/ */
public JMSProducer getJmsProducer( public JMSProducer getJmsProducer(
long curCycle,
S4JJMSContextWrapper s4JJMSContextWrapper, S4JJMSContextWrapper s4JJMSContextWrapper,
boolean asyncApi) throws JMSException boolean asyncApi) throws JMSException
{ {
S4JSpace s4jSpace = s4jSpaceF.apply(curCycle);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
S4JSpace.JMSGenObjCacheKey producerCacheKey = S4JSpace.JMSGenObjCacheKey producerCacheKey =
new S4JSpace.JMSGenObjCacheKey( new S4JSpace.JMSGenObjCacheKey(
@ -298,6 +302,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
* If the JMS consumer that corresponds to a destination(, subscription, message selector) exists, reuse it; Otherwise, create it * If the JMS consumer that corresponds to a destination(, subscription, message selector) exists, reuse it; Otherwise, create it
*/ */
public JMSConsumer getJmsConsumer( public JMSConsumer getJmsConsumer(
long curCycle,
S4JJMSContextWrapper s4JJMSContextWrapper, S4JJMSContextWrapper s4JJMSContextWrapper,
Destination destination, Destination destination,
String destType, String destType,
@ -318,6 +323,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
String.join("::", String.join("::",
getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer")); getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer"));
S4JSpace s4jSpace = s4jSpaceF.apply(curCycle);
return s4jSpace.getJmsConsumer(consumerCacheKey, () -> { return s4jSpace.getJmsConsumer(consumerCacheKey, () -> {
JMSConsumer jmsConsumer; JMSConsumer jmsConsumer;
@ -360,6 +366,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
// - session mode is equal to "SESSION_TRANSACTED" // - session mode is equal to "SESSION_TRANSACTED"
// - "txn_batch_num" has been reached since last reset // - "txn_batch_num" has been reached since last reset
boolean commitTransaction = (Session.SESSION_TRANSACTED == jmsSessionMode) && (0 < txnBatchNum); boolean commitTransaction = (Session.SESSION_TRANSACTED == jmsSessionMode) && (0 < txnBatchNum);
S4JSpace s4jSpace = s4jSpaceF.apply(curCycleNum);
if (commitTransaction) { if (commitTransaction) {
int txnBatchTackingCnt = s4jSpace.getTxnBatchTrackingCnt(); int txnBatchTackingCnt = s4jSpace.getTxnBatchTrackingCnt();

View File

@ -49,8 +49,7 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
@Override @Override
public OpMapper<StdoutOp,StdoutSpace> getOpMapper() { public OpMapper<StdoutOp,StdoutSpace> getOpMapper() {
ConcurrentSpaceCache<StdoutSpace> ctxCache = getSpaceCache(); return new StdoutOpMapper(this);
return new StdoutOpMapper(this, ctxCache);
} }
@Override @Override

View File

@ -28,11 +28,9 @@ import java.util.function.LongFunction;
public class StdoutOpMapper implements OpMapper<StdoutOp,StdoutSpace> { public class StdoutOpMapper implements OpMapper<StdoutOp,StdoutSpace> {
private final ConcurrentSpaceCache<? extends StdoutSpace> spaceCache;
private final DriverAdapter adapter; private final DriverAdapter adapter;
public StdoutOpMapper(DriverAdapter adapter, ConcurrentSpaceCache<? extends StdoutSpace> spaceCache) { public StdoutOpMapper(DriverAdapter adapter) {
this.spaceCache = spaceCache;
this.adapter = adapter; this.adapter = adapter;
} }

View File

@ -51,7 +51,7 @@ public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpCl
@Override @Override
public OpMapper<TcpClientOp,TcpClientAdapterSpace> getOpMapper() { public OpMapper<TcpClientOp,TcpClientAdapterSpace> getOpMapper() {
return new TcpClientOpMapper(this,getSpaceCache()); return new TcpClientOpMapper(this);
} }
@Override @Override

View File

@ -26,19 +26,17 @@ import java.util.function.LongFunction;
public class TcpClientOpMapper implements OpMapper<TcpClientOp,TcpClientAdapterSpace> { public class TcpClientOpMapper implements OpMapper<TcpClientOp,TcpClientAdapterSpace> {
private final ConcurrentSpaceCache<TcpClientAdapterSpace> ctxcache;
private final TcpClientDriverAdapter adapter; private final TcpClientDriverAdapter adapter;
public TcpClientOpMapper(TcpClientDriverAdapter adapter, ConcurrentSpaceCache<TcpClientAdapterSpace> ctxcache) { public TcpClientOpMapper(TcpClientDriverAdapter adapter) {
this.ctxcache = ctxcache;
this.adapter = adapter; this.adapter = adapter;
} }
@Override @Override
public OpDispenser<TcpClientOp> apply(ParsedOp op, LongFunction<TcpClientAdapterSpace> spaceInitF) { public OpDispenser<TcpClientOp> apply(ParsedOp op, LongFunction<TcpClientAdapterSpace> spaceInitF) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default"); LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpClientAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(cycle); LongFunction<TcpClientAdapterSpace> ctxfunc = adapter.getSpaceFunc(op);
return new TcpClientOpDispenser(adapter,op,ctxfunc); return new TcpClientOpDispenser(adapter,op,ctxfunc);
} }

View File

@ -49,7 +49,7 @@ public class TcpServerDriverAdapter extends BaseDriverAdapter<TcpServerOp, TcpSe
@Override @Override
public OpMapper<TcpServerOp,TcpServerAdapterSpace> getOpMapper() { public OpMapper<TcpServerOp,TcpServerAdapterSpace> getOpMapper() {
return new TcpServerOpMapper(this,getSpaceCache()); return new TcpServerOpMapper(this);
} }
@Override @Override

View File

@ -26,19 +26,17 @@ import java.util.function.LongFunction;
public class TcpServerOpMapper implements OpMapper<TcpServerOp,TcpServerAdapterSpace> { public class TcpServerOpMapper implements OpMapper<TcpServerOp,TcpServerAdapterSpace> {
private final ConcurrentSpaceCache<TcpServerAdapterSpace> ctxcache;
private final TcpServerDriverAdapter adapter; private final TcpServerDriverAdapter adapter;
public TcpServerOpMapper(TcpServerDriverAdapter adapter, ConcurrentSpaceCache<TcpServerAdapterSpace> ctxcache) { public TcpServerOpMapper(TcpServerDriverAdapter adapter) {
this.ctxcache = ctxcache;
this.adapter = adapter; this.adapter = adapter;
} }
@Override @Override
public OpDispenser<TcpServerOp> apply(ParsedOp op, LongFunction<TcpServerAdapterSpace> spaceInitF) { public OpDispenser<TcpServerOp> apply(ParsedOp op, LongFunction<TcpServerAdapterSpace> spaceInitF) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default"); LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpServerAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(cycle); LongFunction<TcpServerAdapterSpace> ctxfunc = adapter.getSpaceFunc(op);
return new TcpServerOpDispenser(adapter,op,ctxfunc); return new TcpServerOpDispenser(adapter,op,ctxfunc);
} }
} }

View File

@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
/** /**
* See {@link OpDispenser} for details on how to use this type. * See {@link OpDispenser} for details on how to use this type.
@ -65,6 +66,7 @@ public abstract class BaseOpDispenser<OP extends CycleOp<?>,SPACE extends Space>
private Timer errorTimer; private Timer errorTimer;
private final String[] timerStarts; private final String[] timerStarts;
private final String[] timerStops; private final String[] timerStops;
protected LongFunction<? extends SPACE> spaceF;
/** /**
* package imports used with "verifiers" or "expected-result" are accumulated here * package imports used with "verifiers" or "expected-result" are accumulated here
@ -83,6 +85,7 @@ public abstract class BaseOpDispenser<OP extends CycleOp<?>,SPACE extends Space>
super(adapter); super(adapter);
opName = op.getName(); opName = op.getName();
this.adapter = adapter; this.adapter = adapter;
this.spaceF = adapter.getSpaceFunc(op);
labels = op.getLabels(); labels = op.getLabels();
this.timerStarts = op.takeOptionalStaticValue(START_TIMERS, String.class) this.timerStarts = op.takeOptionalStaticValue(START_TIMERS, String.class)

View File

@ -82,7 +82,7 @@ import java.util.function.LongFunction;
* generally something that implements {@link Runnable}. * generally something that implements {@link Runnable}.
*/ */
public interface OpMapper<OPTYPE extends CycleOp<?>, SPACETYPE extends Space> public interface OpMapper<OPTYPE extends CycleOp<?>, SPACETYPE extends Space>
extends BiFunction<ParsedOp, LongFunction<SPACETYPE>, OpDispenser<OPTYPE>> { extends BiFunction<ParsedOp, LongFunction<SPACETYPE>, OpDispenser<? extends OPTYPE>> {
/** /**
* Interrogate the parsed command, and provide a new * Interrogate the parsed command, and provide a new

View File

@ -35,12 +35,15 @@ import java.util.function.LongFunction;
import java.util.function.LongToIntFunction; import java.util.function.LongToIntFunction;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public abstract class BaseDriverAdapter<R extends CycleOp<?>, S extends Space> extends NBBaseComponent implements DriverAdapter<R, S>, NBConfigurable, NBReconfigurable { public abstract class BaseDriverAdapter<RESULT
extends CycleOp<?>, SPACE extends Space> extends NBBaseComponent
implements DriverAdapter<RESULT, SPACE>, NBConfigurable, NBReconfigurable {
private final static Logger logger = LogManager.getLogger("ADAPTER"); private final static Logger logger = LogManager.getLogger("ADAPTER");
private ConcurrentSpaceCache<S> spaceCache; private ConcurrentSpaceCache<SPACE> spaceCache;
private NBConfiguration cfg; private NBConfiguration cfg;
private LongFunction<S> spaceF; private LongFunction<SPACE> spaceF;
public BaseDriverAdapter(NBComponent parentComponent, NBLabels labels) { public BaseDriverAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels); super(parentComponent, labels);
@ -57,9 +60,9 @@ public abstract class BaseDriverAdapter<R extends CycleOp<?>, S extends Space> e
public final Function<Map<String, Object>, Map<String, Object>> getPreprocessor() { public final Function<Map<String, Object>, Map<String, Object>> getPreprocessor() {
List<Function<Map<String, Object>, Map<String, Object>>> mappers = new ArrayList<>(); List<Function<Map<String, Object>, Map<String, Object>>> mappers = new ArrayList<>();
List<Function<Map<String, Object>, Map<String, Object>>> stmtRemappers = List<Function<Map<String, Object>, Map<String, Object>>> stmtRemappers =
getOpStmtRemappers().stream() getOpStmtRemappers().stream()
.map(m -> new FieldDestructuringMapper("stmt", m)) .map(m -> new FieldDestructuringMapper("stmt", m))
.collect(Collectors.toList()); .collect(Collectors.toList());
mappers.addAll(stmtRemappers); mappers.addAll(stmtRemappers);
mappers.addAll(getOpFieldRemappers()); mappers.addAll(getOpFieldRemappers());
@ -129,10 +132,9 @@ public abstract class BaseDriverAdapter<R extends CycleOp<?>, S extends Space> e
return List.of(); return List.of();
} }
@Override private final synchronized ConcurrentSpaceCache<SPACE> getSpaceCache() {
public final synchronized ConcurrentSpaceCache<S> getSpaceCache() {
if (spaceCache == null) { if (spaceCache == null) {
spaceCache = new ConcurrentSpaceCache<S>(this,getSpaceInitializer(getConfiguration())); spaceCache = new ConcurrentSpaceCache<SPACE>(this, getSpaceInitializer(getConfiguration()));
} }
return spaceCache; return spaceCache;
} }
@ -161,40 +163,40 @@ public abstract class BaseDriverAdapter<R extends CycleOp<?>, S extends Space> e
@Override @Override
public NBConfigModel getConfigModel() { public NBConfigModel getConfigModel() {
return ConfigModel.of(BaseDriverAdapter.class) return ConfigModel.of(BaseDriverAdapter.class)
.add(Param.optional("alias")) .add(Param.optional("alias"))
.add(Param.optional("labels",String.class,"Labels which will apply to metrics and annotations for this activity only")) .add(Param.optional("labels", String.class, "Labels which will apply to metrics and annotations for this activity only"))
.add(Param.defaultTo("strict", true, "strict op field mode, which requires that provided op fields are recognized and used")) .add(Param.defaultTo("strict", true, "strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form")) .add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form"))
.add(Param.optional("tags", String.class, "tags to be used to filter operations")) .add(Param.optional("tags", String.class, "tags to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration")) .add(Param.defaultTo("errors", "stop", "error handler configuration"))
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool")) .add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("stride").setRegex("\\d+")) .add(Param.optional("stride").setRegex("\\d+"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second")) .add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional("cycles").setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\.\\d+[KMBGTPE]?").setDescription("cycle interval to use")) .add(Param.optional("cycles").setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\.\\d+[KMBGTPE]?").setDescription("cycle interval to use"))
.add(Param.optional("recycles").setDescription("allow cycles to be re-used this many times")) .add(Param.optional("recycles").setDescription("allow cycles to be re-used this many times"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second")) .add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.add(Param.optional("seq", String.class, "sequencing algorithm")) .add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class)) .add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file")) .add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver", String.class)) .add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|emit|none)")) .add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|emit|none)"))
.add(Param.optional("maxtries", Integer.class)) .add(Param.optional("maxtries", Integer.class))
.asReadOnly(); .asReadOnly();
} }
@Override @Override
public NBConfigModel getReconfigModel() { public NBConfigModel getReconfigModel() {
return ConfigModel.of(BaseDriverAdapter.class) return ConfigModel.of(BaseDriverAdapter.class)
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool")) .add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second")) .add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second")) .add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.asReadOnly(); .asReadOnly();
} }
@Override @Override
public LongFunction<S> getSpaceFunc(ParsedOp pop) { public LongFunction<SPACE> getSpaceFunc(ParsedOp pop) {
Optional<LongFunction<Object>> spaceFuncTest = pop.getAsOptionalFunction("space",Object.class); Optional<LongFunction<Object>> spaceFuncTest = pop.getAsOptionalFunction("space", Object.class);
LongToIntFunction cycleToSpaceF; LongToIntFunction cycleToSpaceF;
if (spaceFuncTest.isEmpty()) { if (spaceFuncTest.isEmpty()) {
cycleToSpaceF = (long l) -> 0; cycleToSpaceF = (long l) -> 0;
@ -203,7 +205,7 @@ public abstract class BaseDriverAdapter<R extends CycleOp<?>, S extends Space> e
if (example instanceof Number n) { if (example instanceof Number n) {
logger.trace("mapping space indirectly with Number type"); logger.trace("mapping space indirectly with Number type");
LongFunction<Number> numberF = pop.getAsRequiredFunction("space", Number.class); LongFunction<Number> numberF = pop.getAsRequiredFunction("space", Number.class);
cycleToSpaceF= l -> numberF.apply(l).intValue(); cycleToSpaceF = l -> numberF.apply(l).intValue();
} else { } else {
logger.trace("mapping space indirectly through hash table to index pool"); logger.trace("mapping space indirectly through hash table to index pool");
LongFunction<?> sourceF = pop.getAsRequiredFunction("space", String.class); LongFunction<?> sourceF = pop.getAsRequiredFunction("space", String.class);
@ -212,7 +214,22 @@ public abstract class BaseDriverAdapter<R extends CycleOp<?>, S extends Space> e
cycleToSpaceF = l -> wrapper.mapKeyToIndex(namerF.apply(l)); cycleToSpaceF = l -> wrapper.mapKeyToIndex(namerF.apply(l));
} }
} }
ConcurrentSpaceCache<S> spaceCache1 = getSpaceCache();
ConcurrentSpaceCache<SPACE> spaceCache1 = getSpaceCache();
return l -> spaceCache1.get(cycleToSpaceF.applyAsInt(l)); return l -> spaceCache1.get(cycleToSpaceF.applyAsInt(l));
} }
@Override
public void beforeDetach() {
for (SPACE space : this.getSpaceCache()) {
try {
// TODO This should be invariant now, remove conditional?
space.close();
} catch (Exception e) {
throw new RuntimeException("Error while shutting down state space for " +
"adapter=" + this.getAdapterName() + ", space=" + space.getName() + ": " + e, e);
}
}
super.beforeDetach();
}
} }

View File

@ -131,22 +131,7 @@ public interface DriverAdapter<OPTYPE extends CycleOp<?>, SPACETYPE extends Spac
return List.of(f -> f); return List.of(f -> f);
} }
/** // ConcurrentSpaceCache<SPACETYPE> getSpaceCache();
* The cache of all objects needed within a single instance
* of a DriverAdapter which are not operations. These are generally
* things needed by operations, or things needed during the
* construction of operations.
*
* See {@link ConcurrentIndexCache} for details on when and how to use this function.
*
* <p>During Adapter Initialization, Op Mapping, Op Synthesis, or Op Execution,
* you may need access to the objects in (the or a) space cache. You can build the
* type of context needed and then provide this function to provide new instances
* when needed.</p>
*
* @return A cache of named objects
*/
ConcurrentSpaceCache<SPACETYPE> getSpaceCache();
/** /**
* This method allows each driver adapter to create named state which is automatically * This method allows each driver adapter to create named state which is automatically
@ -209,5 +194,27 @@ public interface DriverAdapter<OPTYPE extends CycleOp<?>, SPACETYPE extends Spac
return this.getClass().getAnnotation(Service.class).maturity(); return this.getClass().getAnnotation(Service.class).maturity();
} }
LongFunction<SPACETYPE> getSpaceFunc(ParsedOp pop); /**
* <p>The cache of all objects needed within a single instance
* of a DriverAdapter which are not operations. These are generally
* things needed by operations, or things needed during the
* construction of operations.</p>
*
* <p>During Adapter Initialization, Op Mapping, Op Synthesis, or Op Execution,
* you may need access to the objects in (the or a) space cache. You can build the
* type of context needed and then provide this function to provide new instances
* when needed.</p>
*
* <p>The function returned by this method is specialized to the space mapping
* logic in the op template. Specifically, it uses whatever binding is set on a given
* op template for the <em>space</em> op field. If none are provided, then this
* becomes a short-circuit for the default '0'. If a non-numeric binding is provided,
* then an interstitial mapping is added which converts the {@link Object#toString()}
* value to ordinals using a hash map. This is less optimal by far than using
* any binding that produces a {@link Number}.</p>
*
* @return A cache of named objects
*/
public LongFunction<SPACETYPE> getSpaceFunc(ParsedOp pop);
} }

View File

@ -265,16 +265,8 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
for (Map.Entry<String, DriverAdapter<CycleOp<?>,Space>> entry : adapters.entrySet()) { for (Map.Entry<String, DriverAdapter<CycleOp<?>,Space>> entry : adapters.entrySet()) {
String adapterName = entry.getKey(); String adapterName = entry.getKey();
DriverAdapter<?, ?> adapter = entry.getValue(); DriverAdapter<?, ?> adapter = entry.getValue();
for (Space space : adapter.getSpaceCache()) { if (adapter instanceof AutoCloseable autoCloseable) {
if (space instanceof AutoCloseable autoCloseable) { adapter.close();
try {
// TODO This should be invariant now, remove conditional?
autoCloseable.close();
} catch (Exception e) {
throw new RuntimeException("Error while shutting down state space for " +
"adapter=" + adapterName + ", space=" + space.getName() + ": " + e, e);
}
}
} }
} }
} }