fix: lazily derive prepared statements from space

This commit is contained in:
Jonathan Shook 2024-10-30 13:12:30 -05:00
parent 37e570cc42
commit 1595458ad9
2 changed files with 35 additions and 16 deletions

View File

@ -30,6 +30,7 @@ import io.nosqlbench.adapter.cqld4.wrapper.Cqld4LoadBalancerObserver;
import io.nosqlbench.adapter.cqld4.wrapper.Cqld4SessionBuilder;
import io.nosqlbench.adapter.cqld4.wrapper.NodeSummary;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseSpace;
import io.nosqlbench.adapters.api.activityimpl.uniform.ConcurrentIndexCache;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.nbio.Content;
@ -47,12 +48,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
public class Cqld4Space extends BaseSpace<Cqld4Space> {
private final static Logger logger = LogManager.getLogger(Cqld4Space.class);
CqlSession session;
private CqlSession session;
private ConcurrentIndexCache<PreparedStatement> preparedStmtCache = new ConcurrentIndexCache<>("pstmts");
public Cqld4Space(Cqld4DriverAdapter adapter, long spaceidx, NBConfiguration cfg) {
super(adapter,spaceidx);
@ -302,7 +305,6 @@ public class Cqld4Space extends BaseSpace<Cqld4Space> {
}
return Optional.of(mainloader);
}
}
public CqlSession getSession() {
@ -339,10 +341,19 @@ public class Cqld4Space extends BaseSpace<Cqld4Space> {
@Override
public void close() {
try {
this.preparedStmtCache.clear();
this.getSession().close();
} catch (Exception e) {
logger.warn("auto-closeable cql session threw exception in cql space(" + getName() + "): " + e);
throw e;
}
}
public PreparedStatement getOrCreatePreparedStatement(
int refkey,
LongFunction<PreparedStatement> psF
) {
PreparedStatement ps = preparedStmtCache.get(refkey, psF);
return ps;
}
}

View File

@ -69,30 +69,38 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF, ParsedOp op) {
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
boundSession = getSessionFunc().apply(0);
try {
preparedStmt = boundSession.prepare(preparedQueryString);
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
LongFunction<PreparedStatement> prepareStatementF =
(long l) -> (sessionF.apply(l)).prepare(preparedQueryString);
LongFunction<? extends Cqld4Space> lookupSpaceF =
(long l) -> adapter.getSpaceCache().get(l);
int refKey = op.getRefKey();
LongFunction<PreparedStatement> cachedStatementF =
(long l) -> lookupSpaceF.apply(l).getOrCreatePreparedStatement(refKey,prepareStatementF);
LongFunction<Statement> boundStatementF =
(long l) -> cachedStatementF.apply(l).bind(fieldsF.apply(l));
return super.getEnhancedStmtFunc(boundStatementF, op);
} catch (Exception e) {
throw new OpConfigError(e + "( for statement '" + stmtTpl + "')");
}
LongFunction<Statement> boundStmtFunc = c -> {
Object[] apply = fieldsF.apply(c);
return preparedStmt.bind(apply);
};
return super.getEnhancedStmtFunc(boundStmtFunc, op);
}
@Override
public Cqld4CqlOp getOp(long cycle) {
BoundStatement boundStatement;
public Cqld4CqlPreparedStatement getOp(long cycle) {
BoundStatement stmt = (BoundStatement) stmtFunc.apply(cycle);
try {
boundStatement = (BoundStatement) stmtFunc.apply(cycle);
CqlSession session = (CqlSession) sessionF.apply(cycle);
return new Cqld4CqlPreparedStatement(
boundSession,
boundStatement,
sessionF.apply(cycle),
stmt,
getMaxPages(),
isRetryReplace(),
getMaxLwtRetries(),