merge fixes

This commit is contained in:
Jonathan Shook
2022-06-02 17:15:13 -05:00
7 changed files with 231 additions and 32 deletions

View File

@@ -1,3 +1,19 @@
<!--
~ Copyright (c) 2022 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -64,7 +80,7 @@
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.13.0</version>
<version>4.14.1</version>
</dependency>
<dependency>

View File

@@ -0,0 +1,154 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.data.UdtValue;
import com.datastax.oss.driver.api.core.type.*;
import com.datastax.oss.driver.internal.core.type.PrimitiveType;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.*;
import java.util.function.LongFunction;
import static com.datastax.oss.protocol.internal.ProtocolConstants.DataType.*;
/**
* This should only be used when there is an exception thrown by some higher level logic.
* The purpose of this class is to do a more thorough job of checking each step of binding
* values to a prepared statement, and to provide useful feedback to the user
* explaining more specifically what the problem was that caused the original error to be thrown.
*/
public class CQLD4PreparedStmtDiagnostics {
private final static Logger logger = LogManager.getLogger(CQLD4PreparedStmtDiagnostics.class);
public static BoundStatement bindStatement(BoundStatement bound, CqlIdentifier colname, Object colval, DataType coltype) {
if (coltype instanceof PrimitiveType pt) {
return switch (pt.getProtocolCode()) {
case CUSTOM -> throw new OpConfigError("Error with Custom DataType");
case ASCII, VARCHAR -> bound.setString(colname, (String) colval);
case BIGINT, COUNTER -> bound.setLong(colname, (long) colval);
case BLOB -> bound.setByteBuffer(colname, (ByteBuffer) colval);
case BOOLEAN -> bound.setBoolean(colname, (boolean) colval);
case DECIMAL -> bound.setBigDecimal(colname, (BigDecimal) colval);
case DOUBLE -> bound.setDouble(colname, (double) colval);
case FLOAT -> bound.setFloat(colname, (float) colval);
case INT, SMALLINT, TINYINT -> bound.setInt(colname, (int) colval);
case TIMESTAMP -> bound.setInstant(colname, (Instant) colval);
case TIMEUUID, UUID -> bound.setUuid(colname, (UUID) colval);
case VARINT -> bound.setBigInteger(colname, (BigInteger) colval);
case INET -> bound.setInetAddress(colname, (InetAddress) colval);
case DATE -> bound.setLocalDate(colname, (LocalDate) colval);
case TIME -> bound.setLocalTime(colname, (LocalTime) colval);
case DURATION -> bound.setCqlDuration(colname, (CqlDuration) colval);
case LIST -> bound.setList(colname, (List) colval, ((List) colval).get(0).getClass());
case MAP -> {
Map map = (Map) colval;
Set<Map.Entry> entries = map.entrySet();
Optional<Map.Entry> first = entries.stream().findFirst();
if (first.isPresent()) {
yield bound.setMap(colname, map, first.get().getKey().getClass(), first.get().getValue().getClass());
} else {
yield bound.setMap(colname, map, Object.class, Object.class);
}
}
case SET -> {
Set set = (Set) colval;
Optional first = set.stream().findFirst();
if (first.isPresent()) {
yield bound.setSet(colname, set, first.get().getClass());
} else {
yield bound.setSet(colname, Set.of(), Object.class);
}
}
case UDT -> {
UdtValue udt = (UdtValue) colval;
yield bound.setUdtValue(colname, udt);
}
case TUPLE -> {
TupleValue tuple = (TupleValue) colval;
yield bound.setTupleValue(colname, tuple);
}
default -> throw new RuntimeException("Unknown CQL type for diagnostic (type:'" + coltype + "',code:'" + coltype.getProtocolCode() + "'");
};
}
throw new IllegalStateException("Unexpected value: " + coltype);
}
public static Cqld4CqlOp rebindWithDiagnostics(
PreparedStatement preparedStmt,
LongFunction<Object[]> fieldsF,
long cycle,
Exception exception
) {
logger.error(exception);
ColumnDefinitions defs = preparedStmt.getVariableDefinitions();
Object[] values = fieldsF.apply(cycle);
if (defs.size() != values.length) {
throw new OpConfigError("There are " + defs.size() + " anchors in statement '" + preparedStmt.getQuery() + "'" +
"but only " + values.length + " values were provided.");
}
BoundStatement bound = preparedStmt.bind();
int idx = 0;
for (int i = 0; i < defs.size(); i++) {
Object value = values[i];
ColumnDefinition def = defs.get(i);
CqlIdentifier defname = def.getName();
DataType type = def.getType();
try {
bound = CQLD4PreparedStmtDiagnostics.bindStatement(bound, defname, value, type);
} catch (ClassCastException cce) {
String errormsg = String.format(
"Unable to bind column '%s' to cql type '%s' with value '%s' (class '%s')",
defname,
type.asCql(false, false),
value,
value.getClass().getCanonicalName()
);
logger.error(errormsg);
throw new OpConfigError(errormsg, cce);
}
}
// If we got here, then either someone used the diagnostic binder where they shouldn't (It's SLOW,
// and there was no exception which prompted a retry with this diagnostic) OR
// There was an error detected in the caller and it was not seen here where it should have been
// reproduced.
throw new OpConfigError("The diagnostic binder was called but no error was found. This is a logic error.");
}
}

View File

@@ -25,53 +25,72 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser {
private final static Logger logger = LogManager.getLogger(Cqld4PreparedStmtDispenser.class);
private final RSProcessors processors;
private final LongFunction<Statement> stmtFunc;
private final ParsedTemplate stmtTpl;
private final LongFunction<Object[]> fieldsF;
private PreparedStatement preparedStmt;
private CqlSession boundSession;
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp cmd, ParsedTemplate stmtTpl, RSProcessors processors) {
super(sessionFunc, cmd);
if (cmd.isDynamic("space")) {
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op, ParsedTemplate stmtTpl, RSProcessors processors) {
super(sessionFunc, op);
if (op.isDynamic("space")) {
throw new RuntimeException("Prepared statements and dynamic space values are not supported." +
" This would churn the prepared statement cache, defeating the purpose of prepared statements.");
}
this.processors = processors;
this.stmtTpl = stmtTpl;
stmtFunc = createStmtFunc(cmd);
this.fieldsF = getFieldsFunction(op);
stmtFunc = createStmtFunc(fieldsF, op);
}
protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {
private LongFunction<Object[]> getFieldsFunction(ParsedOp op) {
LongFunction<Object[]> varbinder;
varbinder = cmd.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
varbinder = op.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
return varbinder;
}
protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF, ParsedOp op) {
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
boundSession = getSessionFunc().apply(0);
preparedStmt = boundSession.prepare(preparedQueryString);
LongFunction<Statement> boundStmtFunc = c -> {
Object[] apply = varbinder.apply(c);
Object[] apply = fieldsF.apply(c);
return preparedStmt.bind(apply);
};
return super.getEnhancedStmtFunc(boundStmtFunc, cmd);
return super.getEnhancedStmtFunc(boundStmtFunc, op);
}
@Override
public Cqld4CqlOp apply(long value) {
public Cqld4CqlOp apply(long cycle) {
BoundStatement boundStatement;
try {
boundStatement = (BoundStatement) stmtFunc.apply(cycle);
return new Cqld4CqlPreparedStatement(
boundSession,
(BoundStatement) stmtFunc.apply(value),
boundStatement,
getMaxPages(),
isRetryReplace(),
processors
);
} catch (Exception exception) {
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(
preparedStmt,
fieldsF,
cycle,
exception
);
}
}
}

View File

@@ -21,6 +21,7 @@ import com.google.gson.GsonBuilder;
import io.nosqlbench.engine.api.util.Tagged;
import io.nosqlbench.nb.api.config.params.Element;
import io.nosqlbench.nb.api.config.params.NBParams;
import io.nosqlbench.nb.api.config.standard.NBTypeConverter;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
@@ -204,11 +205,12 @@ public abstract class OpTemplate implements Tagged {
Object value = getParams().remove(name);
try {
return (V) defaultValue.getClass().cast(value);
} catch (Exception e) {
throw new RuntimeException("Unable to cast type " + value.getClass().getCanonicalName() + " to " + defaultValue.getClass().getCanonicalName(), e);
if (defaultValue.getClass().isAssignableFrom(value.getClass())) {
return (V) value;
} else {
return NBTypeConverter.convertOr(value,defaultValue);
}
}
@SuppressWarnings("unchecked")

View File

@@ -474,18 +474,17 @@
<source>17</source>
<release>17</release>
<compilerArgs>
--enable-preview
<!-- <compilerArg>-Xdoclint:all</compilerArg>-->
<!-- <compilerArg>-Xlint:all</compilerArg>-->
</compilerArgs>
<!--<compilerArgument>-Xdoclint:all</compilerArgument>-->
<!-- <compilerArgument>-Xlint:all</compilerArgument>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M6</version>
<configuration>
<!-- <argLine>&#45;&#45;enable-preview</argLine>-->
<excludes>
<exclude>**/*Integrated*Test*.java</exclude>
<exclude>**/*IntegrationTest.java</exclude>
@@ -502,6 +501,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M6</version>
<executions>
<execution>
<id>run-tests</id>
@@ -513,7 +513,6 @@
</execution>
</executions>
<configuration>
<!-- <argLine>&#45;&#45;enable-preview</argLine>-->
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<includes>
@@ -534,6 +533,7 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.3.1</version>
<configuration>
<release>17</release>
<doctitle>${javadoc.name}</doctitle>
<windowtitle>${javadoc.name}</windowtitle>
<isOffline>false</isOffline>
@@ -541,8 +541,10 @@
<detectLinks>false</detectLinks>
<detectOfflineLinks>false</detectOfflineLinks>
<!-- <additionalparam>-Xdoclint:none</additionalparam>-->
<additionalOptions>-Xdoclint:none</additionalOptions>
<additionalJOption>-Xdoclint:none</additionalJOption>
<additionalOptions>
<additionalOption>-Xdoclint:none</additionalOption>
</additionalOptions>
<!-- <additionalJOption>-Xdoclint:none</additionalJOption>-->
<doclint>none</doclint>
</configuration>
<executions>
@@ -560,6 +562,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
@@ -575,6 +578,7 @@
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.13</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
@@ -586,6 +590,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>3.0.1</version>
</plugin>
<plugin>

View File

@@ -27,7 +27,10 @@ public class OpConfigError extends ActivityInitError {
private final String configSource;
public OpConfigError(String error) {
this(error,null);
this(error,null,null);
}
public OpConfigError(String error, Throwable cause) {
this(error, null, cause);
}
public OpConfigError(String error, String configSource) {

View File

@@ -477,7 +477,7 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
if (cfgsource.containsKey(name)) {
Object object = cfgsource.get(name);
if (type.isAssignableFrom(object.getClass())) {
return Optional.of(l -> (V) cfgsource.get(name));
return Optional.of(l -> type.cast(cfgsource.get(name)));
} else if (NBTypeConverter.canConvert(object, type)) {
return Optional.of(l -> NBTypeConverter.convert(cfgsource.get(name), type));
} else {