mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-01-26 15:36:33 -06:00
partial work on inactive driver
This commit is contained in:
parent
db46f047cd
commit
e3b0c3da82
@ -1,8 +1,7 @@
|
||||
package io.nosqlbench.activitytype.cqld4.codecsupport;
|
||||
|
||||
import com.datastax.driver.core.CodecRegistry;
|
||||
import com.datastax.driver.core.Session;
|
||||
import com.datastax.driver.core.UserType;
|
||||
import com.datastax.oss.driver.api.core.session.Session;
|
||||
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -1,22 +1,31 @@
|
||||
package io.nosqlbench.activitytype.cqld4.codecsupport;
|
||||
|
||||
import com.datastax.driver.core.TypeCodec;
|
||||
import com.datastax.driver.core.UDTValue;
|
||||
import com.datastax.driver.core.UserType;
|
||||
import com.datastax.driver.extras.codecs.MappingCodec;
|
||||
import com.datastax.oss.driver.api.core.data.UdtValue;
|
||||
import com.datastax.oss.driver.api.core.type.codec.MappingCodec;
|
||||
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
|
||||
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
|
||||
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||
|
||||
public abstract class UDTTransformCodec<T> extends MappingCodec<T,UDTValue> {
|
||||
public abstract class UDTTransformCodec<T> extends MappingCodec<T, UdtValue> {
|
||||
|
||||
protected UserType userType;
|
||||
// protected UserType userType;
|
||||
|
||||
public UDTTransformCodec(UserType userType, Class<T> javaType) {
|
||||
super(TypeCodec.userType(userType), javaType);
|
||||
this.userType = userType;
|
||||
public UDTTransformCodec(
|
||||
@NonNull TypeCodec<T> innerCodec,
|
||||
@NonNull GenericType<UdtValue> outerJavaType
|
||||
) {
|
||||
super(innerCodec, outerJavaType);
|
||||
}
|
||||
|
||||
public UserType getUserType() {
|
||||
return userType;
|
||||
}
|
||||
//
|
||||
// public UDTTransformCodec(GenericType userType, Class<T> javaType) {
|
||||
// super(TypeCodec.userType(userType), javaType);
|
||||
// this.userType = userType;
|
||||
// }
|
||||
|
||||
// public UserType getUserType() {
|
||||
// return userType;
|
||||
// }
|
||||
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,11 @@
|
||||
package io.nosqlbench.activitytype.cqld4.codecsupport;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
|
||||
import com.datastax.oss.driver.api.core.session.Session;
|
||||
import com.datastax.oss.driver.api.core.type.UserDefinedType;
|
||||
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
|
||||
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
|
||||
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -12,17 +17,18 @@ public abstract class UserCodecProvider {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(UserCodecProvider.class);
|
||||
|
||||
public List<UDTTransformCodec> registerCodecsForCluster(
|
||||
public List<UDTTransformCodec<?>> registerCodecsForCluster(
|
||||
Session session,
|
||||
boolean allowAcrossKeyspaces
|
||||
) {
|
||||
List<UDTTransformCodec> typeCodecs = new ArrayList<>();
|
||||
List<UDTTransformCodec<?>> typeCodecs = new ArrayList<>();
|
||||
|
||||
List<KeyspaceMetadata> ksMetas = new ArrayList<>(session.getCluster().getMetadata().getKeyspaces());
|
||||
|
||||
List<KeyspaceMetadata> ksMetas = new ArrayList<>(session.getMetadata().getKeyspaces().values());
|
||||
|
||||
for (KeyspaceMetadata keyspace : ksMetas) {
|
||||
|
||||
List<UDTTransformCodec> keyspaceCodecs = registerCodecsForKeyspace(session, keyspace.getName());
|
||||
List<UDTTransformCodec> keyspaceCodecs = registerCodecsForKeyspace(session, keyspace.getName().toString());
|
||||
|
||||
for (UDTTransformCodec typeCodec : keyspaceCodecs) {
|
||||
if (typeCodecs.contains(typeCodec) && !allowAcrossKeyspaces) {
|
||||
@ -38,27 +44,27 @@ public abstract class UserCodecProvider {
|
||||
|
||||
public List<UDTTransformCodec> registerCodecsForKeyspace(Session session, String keyspace) {
|
||||
|
||||
CodecRegistry registry = session.getCluster().getConfiguration().getCodecRegistry();
|
||||
CodecRegistry registry = session.getContext().getCodecRegistry();
|
||||
|
||||
List<UDTTransformCodec> codecsForKeyspace = new ArrayList<>();
|
||||
|
||||
KeyspaceMetadata ksMeta = session.getCluster().getMetadata().getKeyspace(keyspace);
|
||||
KeyspaceMetadata ksMeta = session.getMetadata().getKeyspace(keyspace).orElseThrow();
|
||||
if (ksMeta==null) {
|
||||
logger.warn("No metadata for " + keyspace);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
Collection<UserType> typesInKeyspace = ksMeta.getUserTypes();
|
||||
Collection<UserDefinedType> typesInKeyspace = ksMeta.getUserDefinedTypes().values();
|
||||
|
||||
List<Class<? extends UDTTransformCodec>> providedCodecClasses = getUDTCodecClasses();
|
||||
|
||||
Map<UserType, Class<? extends UDTTransformCodec>> codecMap = new HashMap<>();
|
||||
Map<UserDefinedType, Class<? extends UDTTransformCodec>> codecMap = new HashMap<>();
|
||||
|
||||
for (Class<? extends TypeCodec> providedCodecClass : providedCodecClasses) {
|
||||
Class<? extends UDTTransformCodec> udtCodecClass = (Class<? extends UDTTransformCodec>) providedCodecClass;
|
||||
|
||||
List<String> targetUDTTypes = getUDTTypeNames(udtCodecClass);
|
||||
for (UserType keyspaceUserType : typesInKeyspace) {
|
||||
String ksTypeName = keyspaceUserType.getTypeName();
|
||||
for (UserDefinedType keyspaceUserType : typesInKeyspace) {
|
||||
String ksTypeName = keyspaceUserType.getName().toString();
|
||||
String globalTypeName = (ksTypeName.contains(".") ? ksTypeName.split("\\.",2)[1] : ksTypeName);
|
||||
if (targetUDTTypes.contains(ksTypeName) || targetUDTTypes.contains(globalTypeName)) {
|
||||
codecMap.put(keyspaceUserType, udtCodecClass);
|
||||
@ -66,12 +72,13 @@ public abstract class UserCodecProvider {
|
||||
}
|
||||
}
|
||||
|
||||
for (UserType userType : codecMap.keySet()) {
|
||||
for (UserDefinedType userType : codecMap.keySet()) {
|
||||
Class<? extends UDTTransformCodec> codecClass = codecMap.get(userType);
|
||||
Class<?> udtJavaType = getUDTJavaType(codecClass);
|
||||
UDTTransformCodec udtCodec = instantiate(userType, codecClass, udtJavaType);
|
||||
codecsForKeyspace.add(udtCodec);
|
||||
registry.register(udtCodec);
|
||||
((MutableCodecRegistry)registry).register(udtCodec);
|
||||
|
||||
logger.info("registered codec:" + udtCodec);
|
||||
}
|
||||
|
||||
@ -79,9 +86,10 @@ public abstract class UserCodecProvider {
|
||||
|
||||
}
|
||||
|
||||
private UDTTransformCodec instantiate(UserType key, Class<? extends UDTTransformCodec> codecClass, Class<?> javaType) {
|
||||
private UDTTransformCodec instantiate(UserDefinedType key, Class<? extends UDTTransformCodec> codecClass,
|
||||
Class<?> javaType) {
|
||||
try {
|
||||
Constructor<? extends UDTTransformCodec> ctor = codecClass.getConstructor(UserType.class, Class.class);
|
||||
Constructor<? extends UDTTransformCodec> ctor = codecClass.getConstructor(UserDefinedType.class, Class.class);
|
||||
UDTTransformCodec typeCodec = ctor.newInstance(key, javaType);
|
||||
return typeCodec;
|
||||
} catch (Exception e) {
|
||||
|
@ -1,6 +1,12 @@
|
||||
package io.nosqlbench.activitytype.cqld4.errorhandling;
|
||||
|
||||
import com.datastax.driver.core.exceptions.*;
|
||||
import com.datastax.oss.driver.api.core.DriverException;
|
||||
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
|
||||
import com.datastax.oss.driver.api.core.auth.AuthenticationException;
|
||||
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
|
||||
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
|
||||
import com.datastax.oss.driver.api.core.servererrors.*;
|
||||
import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
|
||||
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.*;
|
||||
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable;
|
||||
import org.slf4j.Logger;
|
||||
@ -15,22 +21,25 @@ import java.util.Map;
|
||||
* This enumerates all known exception classes, including supertypes,
|
||||
* for the purposes of stable naming in error handling.
|
||||
* This is current as of com.datastax.cassandra:cassandra-driver-core:3.2.0
|
||||
*
|
||||
* TODO: for cqld4, add all exceptions again, keeping the previous ones in their existing places, but eliding the
|
||||
* removed ones and leaving a place holder there, adding the new ones after
|
||||
*/
|
||||
public enum CQLExceptionEnum implements ResultReadable {
|
||||
|
||||
FrameTooLongException(FrameTooLongException.class, 1),
|
||||
CodecNotFoundException(CodecNotFoundException.class, 2),
|
||||
DriverException(DriverException.class, 3),
|
||||
FrameTooLongException(com.datastax.oss.driver.api.core.connection.FrameTooLongException.class, 1),
|
||||
CodecNotFoundException(com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException.class, 2),
|
||||
DriverException(com.datastax.oss.driver.api.core.DriverException.class, 3),
|
||||
|
||||
AuthenticationException(AuthenticationException.class, 4),
|
||||
AuthenticationException(com.datastax.oss.driver.api.core.auth.AuthenticationException.class, 4),
|
||||
TraceRetrievalException(TraceRetrievalException.class, 5),
|
||||
UnsupportedProtocolVersionException(UnsupportedProtocolVersionException.class, 6),
|
||||
UnsupportedProtocolVersionException(com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException.class, 6),
|
||||
NoHostAvailableException(NoHostAvailableException.class, 7),
|
||||
QueryValidationException(QueryValidationException.class, 8),
|
||||
InvalidQueryException(InvalidQueryException.class, 9),
|
||||
InvalidConfigurationInQueryException(InvalidConfigurationInQueryException.class, 10),
|
||||
UnauthorizedException(UnauthorizedException.class, 11),
|
||||
SyntaxError(SyntaxError.class, 12),
|
||||
QueryValidationException(com.datastax.oss.driver.api.core.servererrors.QueryValidationException.class, 8),
|
||||
InvalidQueryException(com.datastax.oss.driver.api.core.servererrors.InvalidQueryException.class, 9),
|
||||
InvalidConfigurationInQueryException(com.datastax.oss.driver.api.core.servererrors.InvalidConfigurationInQueryException.class, 10),
|
||||
UnauthorizedException(com.datastax.oss.driver.api.core.servererrors.UnauthorizedException.class, 11),
|
||||
SyntaxError(com.datastax.oss.driver.api.core.servererrors.SyntaxError.class, 12),
|
||||
AlreadyExistsException(AlreadyExistsException.class, 13),
|
||||
UnpreparedException(UnpreparedException.class, 14),
|
||||
InvalidTypeException(InvalidTypeException.class, 15),
|
||||
@ -55,7 +64,7 @@ public enum CQLExceptionEnum implements ResultReadable {
|
||||
PagingStateException(PagingStateException.class, 34),
|
||||
UnresolvedUserTypeException(UnresolvedUserTypeException.class, 35),
|
||||
UnsupportedFeatureException(UnsupportedFeatureException.class, 36),
|
||||
BusyConnectionException(BusyConnectionException.class, 37),
|
||||
BusyConnectionException(com.datastax.oss.driver.api.core.connection.BusyConnectionException.class, 37),
|
||||
|
||||
ChangeUnappliedCycleException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException.class, 38),
|
||||
ResultSetVerificationException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ResultSetVerificationException.class, 39),
|
||||
|
@ -1,7 +1,7 @@
|
||||
package io.nosqlbench.activitytype.cqld4.statements.binders;
|
||||
|
||||
import com.datastax.driver.core.ConsistencyLevel;
|
||||
import com.datastax.driver.core.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.virtdata.core.bindings.ValuesArrayBinder;
|
||||
|
||||
@ -22,7 +22,7 @@ public class SimpleStatementValuesBinder
|
||||
|
||||
@Override
|
||||
public Statement bindValues(SimpleStatement context, Object[] values) {
|
||||
String query = context.getQueryString();
|
||||
String query = context.getQuery();
|
||||
if(parametrized) {
|
||||
String[] splits = query.split("\\?");
|
||||
assert splits.length == values.length+1;
|
||||
@ -36,7 +36,7 @@ public class SimpleStatementValuesBinder
|
||||
System.out.println(query);
|
||||
|
||||
}
|
||||
SimpleStatement simpleStatement = new SimpleStatement(query);
|
||||
SimpleStatement simpleStatement = SimpleStatement.newInstance(query);
|
||||
ConsistencyLevel cl = context.getConsistencyLevel();
|
||||
if(cl != null){
|
||||
simpleStatement.setConsistencyLevel(context.getConsistencyLevel());
|
||||
|
@ -1,12 +1,20 @@
|
||||
package io.nosqlbench.activitytype.cqld4.statements.binders;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import com.datastax.oss.driver.api.core.ProtocolVersion;
|
||||
import com.datastax.oss.driver.api.core.cql.*;
|
||||
import com.datastax.oss.driver.api.core.session.Session;
|
||||
import com.datastax.oss.driver.api.core.type.DataType;
|
||||
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
|
||||
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
|
||||
import io.nosqlbench.virtdata.api.bindings.VALUE;
|
||||
import io.nosqlbench.virtdata.core.bindings.ValuesArrayBinder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class UnsettableValuesBinder implements ValuesArrayBinder<PreparedStatement, Statement> {
|
||||
@ -18,35 +26,39 @@ public class UnsettableValuesBinder implements ValuesArrayBinder<PreparedStateme
|
||||
|
||||
public UnsettableValuesBinder(Session session) {
|
||||
this.session = session;
|
||||
this.codecRegistry = session.getCluster().getConfiguration().getCodecRegistry();
|
||||
this.protocolVersion = this.session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
|
||||
this.codecRegistry = session.getContext().getCodecRegistry();
|
||||
this.protocolVersion = this.session.getContext().getProtocolVersion();
|
||||
}
|
||||
|
||||
|
||||
// TODO: Allow for warning when nulls are passed and they aren't expected
|
||||
@Override
|
||||
public Statement bindValues(PreparedStatement preparedStatement, Object[] objects) {
|
||||
int i=-1;
|
||||
int i = -1;
|
||||
try {
|
||||
BoundStatement boundStmt = preparedStatement.bind();
|
||||
List<ColumnDefinitions.Definition> defs = preparedStatement.getVariables().asList();
|
||||
ColumnDefinitions variableDefinitions = preparedStatement.getVariableDefinitions();
|
||||
for (i = 0; i < objects.length; i++) {
|
||||
Object value = objects[i];
|
||||
if (VALUE.unset != value) {
|
||||
if (null==value) {
|
||||
if (null == value) {
|
||||
boundStmt.setToNull(i);
|
||||
} else {
|
||||
DataType cqlType = defs.get(i).getType();
|
||||
TypeCodec<Object> codec = codecRegistry.codecFor(cqlType, value);
|
||||
ByteBuffer serialized = codec.serialize(value, protocolVersion);
|
||||
boundStmt.setBytesUnsafe(i,serialized);
|
||||
ColumnDefinition definition = variableDefinitions.get(i);
|
||||
DataType cqlType = definition.getType();
|
||||
TypeCodec<Object> objectTypeCodec = codecRegistry.codecFor(cqlType, value);
|
||||
ByteBuffer serialized = objectTypeCodec.encode(value, protocolVersion);
|
||||
boundStmt.setBytesUnsafe(i, serialized);
|
||||
}
|
||||
}
|
||||
}
|
||||
return boundStmt;
|
||||
} catch (Exception e) {
|
||||
String typNam = (objects[i]==null ? "NULL" : objects[i].getClass().getCanonicalName());
|
||||
logger.error("Error binding column " + preparedStatement.getVariables().asList().get(i).getName() + " with class " + typNam + ": " + e.getMessage(), e);
|
||||
String typNam = (objects[i] == null ? "NULL" : objects[i].getClass().getCanonicalName());
|
||||
List<ColumnDefinition> cdefs = new ArrayList<>();
|
||||
preparedStatement.getVariableDefinitions().forEach(cdefs::add);
|
||||
|
||||
logger.error("Error binding column " + cdefs.get(i).getName() + " with class " + typNam + ": " + e.getMessage(), e);
|
||||
throw e;
|
||||
// StringBuilder sb = new StringBuilder();
|
||||
// sb.append("Error binding objects to prepared statement directly, falling back to diagnostic binding layer:");
|
||||
|
@ -1,6 +1,6 @@
|
||||
package io.nosqlbench.activitytype.cqld4.statements.core;
|
||||
|
||||
import com.datastax.driver.core.ConsistencyLevel;
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user