switch to async resultset

This commit is contained in:
Jonathan Shook
2023-08-30 00:57:54 -05:00
parent cf112e5024
commit 7f881fe3e7
2 changed files with 23 additions and 14 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.adapter.cqld4.exceptions;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
@@ -27,16 +28,16 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
*/
public class ChangeUnappliedCycleException extends CqlGenericCycleException {
private final ResultSet resultSet;
private final AsyncResultSet resultSet;
private final String queryString;
public ChangeUnappliedCycleException(ResultSet resultSet, String queryString) {
public ChangeUnappliedCycleException(AsyncResultSet resultSet, String queryString) {
super("Operation was not applied:" + queryString);
this.resultSet = resultSet;
this.queryString = queryString;
}
public ResultSet getResultSet() {
public AsyncResultSet getResultSet() {
return resultSet;
}
public String getQueryString() { return queryString; }

View File

@@ -17,21 +17,28 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.*;
import io.nosqlbench.adapter.cqld4.Cqld4CqlReboundStatement;
import io.nosqlbench.adapter.cqld4.LWTRebinder;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException;
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
// TODO: add statement filtering
@@ -51,13 +58,14 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
private final int maxPages;
private final boolean retryReplace;
private final int maxLwtRetries;
private int retryReplaceCount =0;
private ResultSet rs;
private Cqld4CqlOp nextOp;
private final RSProcessors processors;
private final ThreadLocal<List<Row>> results = new ThreadLocal<>();
private final CqlOpMetrics metrics;
private int retryReplaceCount = 0;
private Cqld4CqlOp nextOp;
private int fetchedPages = 0;
private int fetchedRows = 0;
private int fetchedBytes = 0;
private int lwtRetries = 0;
public Cqld4CqlOp(
CqlSession session,