mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-01-11 16:32:01 -06:00
add cqld4 prepared statement binding diagnostics for errors
This commit is contained in:
parent
07339c4879
commit
5aaae15592
@ -0,0 +1,155 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
|
||||
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
|
||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
|
||||
import com.datastax.oss.driver.api.core.data.CqlDuration;
|
||||
import com.datastax.oss.driver.api.core.data.TupleValue;
|
||||
import com.datastax.oss.driver.api.core.data.UdtValue;
|
||||
import com.datastax.oss.driver.api.core.type.*;
|
||||
import com.datastax.oss.driver.internal.core.type.PrimitiveType;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalTime;
|
||||
import java.util.*;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
import static com.datastax.oss.protocol.internal.ProtocolConstants.DataType.*;
|
||||
|
||||
/**
|
||||
* This should only be used when there is an exception thrown by some higher level logic.
|
||||
* The purpose of this class is to do a more thorough job of checking each step of binding
|
||||
* values to a prepared statement, and to provide useful feedback to the user
|
||||
* explaining more specifically what the problem was that caused the original error to be thrown.
|
||||
*/
|
||||
public class CQLD4PreparedStmtDiagnostics {
|
||||
private final static Logger logger = LogManager.getLogger(CQLD4PreparedStmtDiagnostics.class);
|
||||
|
||||
public static BoundStatement bindStatement(BoundStatement bound, CqlIdentifier colname, Object colval, DataType coltype) {
|
||||
|
||||
return switch (coltype) {
|
||||
case PrimitiveType pt -> switch (pt.getProtocolCode()) {
|
||||
case CUSTOM -> throw new OpConfigError("Error with Custom DataType");
|
||||
case ASCII, VARCHAR -> bound.setString(colname, (String) colval);
|
||||
case BIGINT, COUNTER ->bound.setLong(colname, (long) colval);
|
||||
case BLOB -> bound.setByteBuffer(colname, (ByteBuffer) colval);
|
||||
case BOOLEAN -> bound.setBoolean(colname, (boolean) colval);
|
||||
case DECIMAL ->bound.setBigDecimal(colname, (BigDecimal) colval);
|
||||
case DOUBLE ->bound.setDouble(colname, (double) colval);
|
||||
case FLOAT ->bound.setFloat(colname, (float) colval);
|
||||
case INT, SMALLINT, TINYINT -> bound.setInt(colname, (int) colval);
|
||||
case TIMESTAMP -> bound.setInstant(colname, (Instant) colval);
|
||||
case TIMEUUID, UUID ->bound.setUuid(colname, (UUID) colval);
|
||||
case VARINT ->bound.setBigInteger(colname, (BigInteger) colval);
|
||||
case INET ->bound.setInetAddress(colname, (InetAddress) colval);
|
||||
case DATE ->bound.setLocalDate(colname, (LocalDate) colval);
|
||||
case TIME -> bound.setLocalTime(colname, (LocalTime) colval);
|
||||
case DURATION ->bound.setCqlDuration(colname, (CqlDuration) colval);
|
||||
case LIST -> bound.setList(colname,(List)colval,((List)colval).get(0).getClass());
|
||||
case MAP -> {
|
||||
Map map = (Map) colval;
|
||||
Set<Map.Entry> entries = map.entrySet();
|
||||
Optional<Map.Entry> first = entries.stream().findFirst();
|
||||
if (first.isPresent()) {
|
||||
yield bound.setMap(colname,map,first.get().getKey().getClass(),first.get().getValue().getClass());
|
||||
} else {
|
||||
yield bound.setMap(colname,map,Object.class,Object.class);
|
||||
}
|
||||
}
|
||||
case SET -> {
|
||||
Set set = (Set)colval;
|
||||
Optional first = set.stream().findFirst();
|
||||
if (first.isPresent()) {
|
||||
yield bound.setSet(colname,set,first.get().getClass());
|
||||
} else {
|
||||
yield bound.setSet(colname,Set.of(),Object.class);
|
||||
}
|
||||
}
|
||||
case UDT -> {
|
||||
UdtValue udt = (UdtValue) colval;
|
||||
yield bound.setUdtValue(colname, udt);
|
||||
}
|
||||
case TUPLE -> {
|
||||
TupleValue tuple = (TupleValue) colval;
|
||||
yield bound.setTupleValue(colname,tuple);
|
||||
}
|
||||
default-> throw new RuntimeException("Unknown CQL type for diagnostic (type:'" + coltype +"',code:'" + coltype.getProtocolCode()+"'");
|
||||
};
|
||||
|
||||
default -> throw new IllegalStateException("Unexpected value: " + coltype);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
public static Cqld4CqlOp rebindWithDiagnostics(
|
||||
PreparedStatement preparedStmt,
|
||||
LongFunction<Object[]> fieldsF,
|
||||
long cycle,
|
||||
Exception exception
|
||||
) {
|
||||
logger.error(exception);
|
||||
ColumnDefinitions defs = preparedStmt.getVariableDefinitions();
|
||||
Object[] values = fieldsF.apply(cycle);
|
||||
if (defs.size() != values.length) {
|
||||
throw new OpConfigError("There are " + defs.size() + " anchors in statement '" + preparedStmt.getQuery() + "'" +
|
||||
"but only " + values.length + " values were provided.");
|
||||
}
|
||||
|
||||
BoundStatement bound = preparedStmt.bind();
|
||||
int idx = 0;
|
||||
for (int i = 0; i < defs.size(); i++) {
|
||||
Object value = values[i];
|
||||
ColumnDefinition def = defs.get(i);
|
||||
CqlIdentifier defname = def.getName();
|
||||
DataType type = def.getType();
|
||||
try {
|
||||
bound = CQLD4PreparedStmtDiagnostics.bindStatement(bound, defname, value, type);
|
||||
} catch (ClassCastException cce) {
|
||||
String errormsg = String.format(
|
||||
"Unable to bind column '%s' to cql type '%s' with value '%s' (class '%s')",
|
||||
defname,
|
||||
type.asCql(false, false),
|
||||
value,
|
||||
value.getClass().getCanonicalName()
|
||||
);
|
||||
logger.error(errormsg);
|
||||
throw new OpConfigError(errormsg, cce);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// If we got here, then either someone used the diagnostic binder where they shouldn't (It's SLOW,
|
||||
// and there was no exception which prompted a retry with this diagnostic) OR
|
||||
// There was an error detected in the caller and it was not seen here where it should have been
|
||||
// reproduced.
|
||||
throw new OpConfigError("The diagnostic binder was called but no error was found. This is a logic error.");
|
||||
}
|
||||
|
||||
}
|
@ -25,53 +25,72 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser {
|
||||
private final static Logger logger = LogManager.getLogger(Cqld4PreparedStmtDispenser.class);
|
||||
|
||||
private final RSProcessors processors;
|
||||
private final LongFunction<Statement> stmtFunc;
|
||||
private final ParsedTemplate stmtTpl;
|
||||
private final LongFunction<Object[]> fieldsF;
|
||||
private PreparedStatement preparedStmt;
|
||||
private CqlSession boundSession;
|
||||
|
||||
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp cmd, ParsedTemplate stmtTpl, RSProcessors processors) {
|
||||
super(sessionFunc, cmd);
|
||||
if (cmd.isDynamic("space")) {
|
||||
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op, ParsedTemplate stmtTpl, RSProcessors processors) {
|
||||
super(sessionFunc, op);
|
||||
if (op.isDynamic("space")) {
|
||||
throw new RuntimeException("Prepared statements and dynamic space values are not supported." +
|
||||
" This would churn the prepared statement cache, defeating the purpose of prepared statements.");
|
||||
}
|
||||
this.processors = processors;
|
||||
this.stmtTpl = stmtTpl;
|
||||
stmtFunc = createStmtFunc(cmd);
|
||||
this.fieldsF = getFieldsFunction(op);
|
||||
stmtFunc = createStmtFunc(fieldsF, op);
|
||||
}
|
||||
|
||||
protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {
|
||||
|
||||
private LongFunction<Object[]> getFieldsFunction(ParsedOp op) {
|
||||
LongFunction<Object[]> varbinder;
|
||||
varbinder = cmd.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
|
||||
varbinder = op.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
|
||||
return varbinder;
|
||||
}
|
||||
|
||||
protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF, ParsedOp op) {
|
||||
|
||||
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
|
||||
boundSession = getSessionFunc().apply(0);
|
||||
preparedStmt = boundSession.prepare(preparedQueryString);
|
||||
|
||||
LongFunction<Statement> boundStmtFunc = c -> {
|
||||
Object[] apply = varbinder.apply(c);
|
||||
Object[] apply = fieldsF.apply(c);
|
||||
return preparedStmt.bind(apply);
|
||||
};
|
||||
return super.getEnhancedStmtFunc(boundStmtFunc, cmd);
|
||||
return super.getEnhancedStmtFunc(boundStmtFunc, op);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlOp apply(long value) {
|
||||
public Cqld4CqlOp apply(long cycle) {
|
||||
|
||||
return new Cqld4CqlPreparedStatement(
|
||||
boundSession,
|
||||
(BoundStatement) stmtFunc.apply(value),
|
||||
getMaxPages(),
|
||||
isRetryReplace(),
|
||||
processors
|
||||
);
|
||||
BoundStatement boundStatement;
|
||||
try {
|
||||
boundStatement = (BoundStatement) stmtFunc.apply(cycle);
|
||||
return new Cqld4CqlPreparedStatement(
|
||||
boundSession,
|
||||
boundStatement,
|
||||
getMaxPages(),
|
||||
isRetryReplace(),
|
||||
processors
|
||||
);
|
||||
} catch (Exception exception) {
|
||||
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(
|
||||
preparedStmt,
|
||||
fieldsF,
|
||||
cycle,
|
||||
exception
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user