mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
fix paging, add page, row, byte metrics for CQL driver
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,13 +19,23 @@ package io.nosqlbench.adapter.cqld4;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
|
||||
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
|
||||
private final BoundStatement stmt;
|
||||
|
||||
public Cqld4CqlReboundStatement(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int lwtRetryCount, BoundStatement rebound, RSProcessors processors) {
|
||||
super(session,maxPages,retryReplace,maxLwtRetries,lwtRetryCount, processors);
|
||||
public Cqld4CqlReboundStatement(
|
||||
CqlSession session,
|
||||
int maxPages,
|
||||
boolean retryReplace,
|
||||
int maxLwtRetries,
|
||||
int lwtRetryCount,
|
||||
BoundStatement rebound,
|
||||
RSProcessors processors,
|
||||
CqlOpMetrics metrics
|
||||
) {
|
||||
super(session,maxPages,retryReplace,maxLwtRetries,lwtRetryCount, processors, metrics);
|
||||
this.stmt = rebound;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
|
||||
import java.util.Map;
|
||||
@@ -29,7 +29,7 @@ public class Cqld4PrintProcessor implements ResultSetProcessor {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(long cycle, ResultSet container) {
|
||||
public void start(long cycle, AsyncResultSet container) {
|
||||
sb.setLength(0);
|
||||
sb.append("c[").append(cycle).append("] ");
|
||||
}
|
||||
|
||||
@@ -290,6 +290,8 @@ public class Cqld4Space implements AutoCloseable {
|
||||
.add(Param.optional("whitelist", String.class, "list of whitelist hosts addresses"))
|
||||
.add(Param.optional("showstmt", Boolean.class, "show the contents of the statement in the log"))
|
||||
.add(Param.optional("cloud_proxy_address", String.class, "Cloud Proxy Address"))
|
||||
.add(Param.optional("maxpages", Integer.class, "Maximum number of pages allowed per CQL request"))
|
||||
.add(Param.optional("maxretryreplace", Integer.class, "Maximum number of retry replaces with LWT for a CQL request"))
|
||||
.add(SSLKsFactory.get().getConfigModel())
|
||||
.add(getDriverOptionsModel())
|
||||
.add(new OptionHelpers(new OptionsMap()).getConfigModel())
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.ResultProcessor;
|
||||
@@ -25,7 +26,7 @@ import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RSProcessors implements ResultProcessor<ResultSet,Row>, Supplier<List<ResultSetProcessor>> {
|
||||
public class RSProcessors implements ResultProcessor<AsyncResultSet,Row>, Supplier<List<ResultSetProcessor>> {
|
||||
|
||||
private final List<Supplier<ResultSetProcessor>> suppliers = new ArrayList<>();
|
||||
|
||||
@@ -47,7 +48,7 @@ public class RSProcessors implements ResultProcessor<ResultSet,Row>, Supplier<Li
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(long cycle, ResultSet container) {
|
||||
public void start(long cycle, AsyncResultSet container) {
|
||||
for (ResultSetProcessor processor : get()) {
|
||||
processor.start(cycle, container);
|
||||
}
|
||||
|
||||
@@ -16,9 +16,9 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.ResultProcessor;
|
||||
|
||||
public interface ResultSetProcessor extends ResultProcessor<ResultSet, Row> {
|
||||
public interface ResultSetProcessor extends ResultProcessor<AsyncResultSet, Row> {
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -17,6 +17,7 @@
|
||||
package io.nosqlbench.adapter.cqld4.exceptions;
|
||||
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
|
||||
/**
|
||||
@@ -27,18 +28,18 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
*/
|
||||
public class ExceededRetryReplaceException extends CqlGenericCycleException {
|
||||
|
||||
private final ResultSet resultSet;
|
||||
private final AsyncResultSet resultSet;
|
||||
private final String queryString;
|
||||
private final int retries;
|
||||
|
||||
public ExceededRetryReplaceException(ResultSet resultSet, String queryString, int retries) {
|
||||
public ExceededRetryReplaceException(AsyncResultSet resultSet, String queryString, int retries) {
|
||||
super("After " + retries + " retries using the retryreplace option, Operation was not applied:" + queryString);
|
||||
this.retries = retries;
|
||||
this.resultSet = resultSet;
|
||||
this.queryString = queryString;
|
||||
}
|
||||
|
||||
public ResultSet getResultSet() {
|
||||
public AsyncResultSet getResultSet() {
|
||||
return resultSet;
|
||||
}
|
||||
public String getQueryString() { return queryString; }
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4.exceptions;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
|
||||
/**
|
||||
* <p>This is not a core exception. It was added to the CQL activity type
|
||||
@@ -35,14 +35,14 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
*/
|
||||
public class UnexpectedPagingException extends RuntimeException {
|
||||
|
||||
private final ResultSet resultSet;
|
||||
private final AsyncResultSet resultSet;
|
||||
private final String queryString;
|
||||
private final int fetchSize;
|
||||
private final int fetchedPages;
|
||||
private final int maxpages;
|
||||
|
||||
public UnexpectedPagingException(
|
||||
ResultSet resultSet,
|
||||
AsyncResultSet resultSet,
|
||||
String queryString,
|
||||
int fetchedPages,
|
||||
int maxpages,
|
||||
@@ -54,16 +54,15 @@ public class UnexpectedPagingException extends RuntimeException {
|
||||
this.fetchSize = fetchSize;
|
||||
}
|
||||
|
||||
public ResultSet getResultSet() {
|
||||
public AsyncResultSet getResultSet() {
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Additional paging would be required to read the results from this query fully" +
|
||||
", but the user has not explicitly indicated that paging was expected.")
|
||||
.append(" fetched/allowed: ").append(fetchedPages).append("/").append(maxpages)
|
||||
.append(" fetchSize(").append(fetchSize).append("): ").append(queryString);
|
||||
return sb.toString();
|
||||
String sb = "Additional paging would be required to read the results from this query fully" +
|
||||
", but the user has not explicitly indicated that paging was expected." +
|
||||
" fetched/allowed: " + fetchedPages + "/" + maxpages +
|
||||
" fetchSize(" + fetchSize + "): " + queryString;
|
||||
return sb;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -14,14 +14,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
package io.nosqlbench.adapter.cqld4.instruments;
|
||||
|
||||
public class Cqld4OpMetrics {
|
||||
public void onStart() {
|
||||
public interface CqlOpMetrics {
|
||||
|
||||
}
|
||||
void recordFetchedPages(int fetchedPages);
|
||||
|
||||
public void onSuccess() {
|
||||
void recordFetchedRows(int fetchedRows);
|
||||
|
||||
}
|
||||
void recordFetchedBytes(int fetchedBytes);
|
||||
}
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
|
||||
@@ -23,12 +24,13 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
|
||||
import com.datastax.oss.driver.api.core.cql.*;
|
||||
import com.datastax.oss.driver.api.core.metadata.Node;
|
||||
import com.datastax.oss.driver.api.core.metadata.token.Token;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Space;
|
||||
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@@ -37,15 +39,17 @@ import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, Cqld4Space> {
|
||||
public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, Cqld4Space> implements CqlOpMetrics {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger("CQLD4");
|
||||
|
||||
private final int maxpages;
|
||||
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final boolean isRetryReplace;
|
||||
private final int maxLwtRetries;
|
||||
private final Histogram rowsHistogram;
|
||||
private final Histogram pagesHistogram;
|
||||
private final Histogram payloadBytesHistogram;
|
||||
|
||||
public Cqld4BaseOpDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op) {
|
||||
super(adapter, op);
|
||||
@@ -53,6 +57,9 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
||||
this.maxpages = op.getStaticConfigOr("maxpages", 1);
|
||||
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
|
||||
this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1);
|
||||
this.rowsHistogram = ActivityMetrics.histogram(op, "rows", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.pagesHistogram = ActivityMetrics.histogram(op, "pages", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.payloadBytesHistogram = ActivityMetrics.histogram(op, "payload_bytes", op.getStaticConfigOr("hdr_digits", 3));
|
||||
}
|
||||
|
||||
public int getMaxPages() {
|
||||
@@ -76,7 +83,7 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
||||
* All implementations of a CQL Statement Dispenser should be using the method
|
||||
* provided by this function. This ensures that {@link Statement}-level attributes
|
||||
* are handled uniformly and in one place.
|
||||
*
|
||||
* <p>
|
||||
* This takes the base statement function and decorates it optionally with each
|
||||
* additional qualified modifier, short-circuiting those which are not specified.
|
||||
* This allows default behavior to take precedence as well as avoids unnecessary calling
|
||||
@@ -136,5 +143,18 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordFetchedRows(int rows) {
|
||||
rowsHistogram.update(rows);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordFetchedPages(int pages) {
|
||||
pagesHistogram.update(pages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordFetchedBytes(int bytes) {
|
||||
payloadBytesHistogram.update(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +91,8 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
getMaxPages(),
|
||||
isRetryReplace(),
|
||||
getMaxLwtRetries(),
|
||||
processors
|
||||
processors,
|
||||
this
|
||||
);
|
||||
} catch (Exception exception) {
|
||||
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(
|
||||
|
||||
@@ -50,7 +50,8 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
(SimpleStatement) stmtFunc.apply(value),
|
||||
getMaxPages(),
|
||||
isRetryReplace(),
|
||||
getMaxLwtRetries()
|
||||
getMaxLwtRetries(),
|
||||
this
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,8 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
(SimpleStatement) stmtFunc.apply(value),
|
||||
getMaxPages(),
|
||||
isRetryReplace(),
|
||||
getMaxLwtRetries()
|
||||
getMaxLwtRetries(),
|
||||
this
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,13 +19,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BatchStatement;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
|
||||
|
||||
public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
|
||||
|
||||
private final BatchStatement stmt;
|
||||
|
||||
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxPage, int maxLwtRetries, boolean retryReplace) {
|
||||
super(session,maxPage,retryReplace,maxLwtRetries,new RSProcessors());
|
||||
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxPage, int maxLwtRetries, boolean retryReplace, CqlOpMetrics metrics) {
|
||||
super(session,maxPage,retryReplace,maxLwtRetries,new RSProcessors(), metrics);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
|
||||
@@ -59,12 +59,20 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
|
||||
|
||||
private final ThreadLocal<List<Row>> results = new ThreadLocal<>();
|
||||
|
||||
public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
||||
public Cqld4CqlOp(
|
||||
CqlSession session,
|
||||
int maxPages,
|
||||
boolean retryReplace,
|
||||
int maxLwtRetries,
|
||||
RSProcessors processors,
|
||||
CqlOpMetrics metrics
|
||||
) {
|
||||
this.session = session;
|
||||
this.maxPages = maxPages;
|
||||
this.retryReplace = retryReplace;
|
||||
this.maxLwtRetries =maxLwtRetries;
|
||||
this.processors = processors;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
protected Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) {
|
||||
@@ -74,59 +82,47 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
|
||||
this.maxLwtRetries =maxLwtRetries;
|
||||
this.retryReplaceCount=retryRplaceCount;
|
||||
this.processors = processors;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
public final List<Row> apply(long cycle) {
|
||||
|
||||
Statement<?> stmt = getStmt();
|
||||
rs = session.execute(stmt);
|
||||
processors.start(cycle, rs);
|
||||
int totalRows = 0;
|
||||
Statement<?> statement = getStmt();
|
||||
logger.trace(() -> "apply() invoked, statement obtained, executing async with page size: " + statement.getPageSize() + " thread local rows: ");
|
||||
CompletionStage<AsyncResultSet> statementStage = session.executeAsync(statement);
|
||||
|
||||
if (!rs.wasApplied()) {
|
||||
if (!retryReplace) {
|
||||
throw new ChangeUnappliedCycleException(rs, getQueryString());
|
||||
} else {
|
||||
retryReplaceCount++;
|
||||
if (retryReplaceCount >maxLwtRetries) {
|
||||
throw new ExceededRetryReplaceException(rs,getQueryString(), retryReplaceCount);
|
||||
}
|
||||
Row one = rs.one();
|
||||
processors.buffer(one);
|
||||
totalRows++;
|
||||
nextOp = this.rebindLwt(stmt, one);
|
||||
CompletionStage<List<Row>> rowsStage = statementStage.thenCompose((rs) -> {
|
||||
processors.start(cycle, rs);
|
||||
ArrayList<Row> completeRowSet = new ArrayList<>();
|
||||
if (!rs.wasApplied()) {
|
||||
handleRebindLWT(rs, statement);
|
||||
}
|
||||
return collect(rs, completeRowSet, cycle);
|
||||
}).exceptionally(throwable -> {
|
||||
if (throwable instanceof RuntimeException tre) throw tre;
|
||||
throw new RuntimeException(throwable);
|
||||
});
|
||||
|
||||
try {
|
||||
return rowsStage.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof RuntimeException re) throw re;
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
processors.flush();
|
||||
metrics.recordFetchedPages(fetchedPages);
|
||||
metrics.recordFetchedRows(fetchedRows);
|
||||
metrics.recordFetchedBytes(fetchedBytes);
|
||||
}
|
||||
|
||||
// Paginated Op
|
||||
|
||||
Iterator<Row> reader = rs.iterator();
|
||||
int pages = 0;
|
||||
// TODO/MVEL: An optimization to this would be to collect the results in a result set processor,
|
||||
// but allow/require this processor to be added to an op _only_ in the event that it would
|
||||
// be needed by a downstream consumer like the MVEL expected result evaluator
|
||||
|
||||
var resultRows = new ArrayList<Row>();
|
||||
while (true) {
|
||||
int pageRows = rs.getAvailableWithoutFetching();
|
||||
for (int i = 0; i < pageRows; i++) {
|
||||
Row row = reader.next();
|
||||
resultRows.add(row);
|
||||
processors.buffer(row);
|
||||
}
|
||||
if (pages++ > maxPages) {
|
||||
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxPages, stmt.getPageSize());
|
||||
}
|
||||
if (rs.isFullyFetched()) {
|
||||
results.set(resultRows);
|
||||
break;
|
||||
}
|
||||
totalRows += pageRows;
|
||||
}
|
||||
processors.flush();
|
||||
return results.get();
|
||||
// logger.trace(() -> "\n\n--- Rows collected for cycle: " + cycle + " count: "
|
||||
// + rs.size() + " dt: " + System.nanoTime());
|
||||
//
|
||||
// results.set(completeRowSet);
|
||||
// processors.flush();
|
||||
}
|
||||
|
||||
// private BiFunction<AsyncResultSet,Throwable> handler
|
||||
@Override
|
||||
public Op getNextOp() {
|
||||
Op next = nextOp;
|
||||
@@ -134,12 +130,8 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
|
||||
return next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ?> capture() {
|
||||
if (rs == null) {
|
||||
throw new UndefinedResultSetException(this);
|
||||
}
|
||||
return null;
|
||||
throw new NotImplementedException("Not implemented for Cqld4CqlOp");
|
||||
}
|
||||
|
||||
public abstract Statement<?> getStmt();
|
||||
@@ -148,7 +140,44 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
|
||||
|
||||
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
|
||||
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
|
||||
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors);
|
||||
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors, metrics);
|
||||
}
|
||||
|
||||
private CompletionStage<List<Row>> collect(AsyncResultSet resultSet, ArrayList<Row> rowList, final long cycle) {
|
||||
fetchedBytes+=resultSet.getExecutionInfo().getResponseSizeInBytes();
|
||||
if (++fetchedPages > maxPages) {
|
||||
throw new UnexpectedPagingException(resultSet, getQueryString(), fetchedPages, maxPages, getStmt().getPageSize());
|
||||
}
|
||||
int remaining = resultSet.remaining();
|
||||
fetchedRows += remaining;
|
||||
rowList.ensureCapacity(rowList.size() + remaining);
|
||||
for (Row row : resultSet.currentPage()) {
|
||||
rowList.add(row);
|
||||
processors.buffer(row);
|
||||
}
|
||||
if (resultSet.hasMorePages()) {
|
||||
return resultSet.fetchNextPage().thenCompose(rs -> collect(rs, rowList, cycle));
|
||||
} else {
|
||||
processors.start(cycle, resultSet);
|
||||
return CompletableFuture.completedStage(rowList);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRebindLWT(AsyncResultSet resultSet, Statement<?> statement) {
|
||||
if (++lwtRetries < maxLwtRetries) {
|
||||
throw new ExceededRetryReplaceException(resultSet, getQueryString(), lwtRetries);
|
||||
}
|
||||
if (!retryReplace) {
|
||||
throw new ChangeUnappliedCycleException(resultSet, getQueryString());
|
||||
} else {
|
||||
retryReplaceCount++;
|
||||
if (retryReplaceCount > maxLwtRetries) {
|
||||
throw new ExceededRetryReplaceException(resultSet, getQueryString(), retryReplaceCount);
|
||||
}
|
||||
Row one = resultSet.one();
|
||||
processors.buffer(one);
|
||||
nextOp = this.rebindLwt(statement, one);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -19,13 +19,15 @@ package io.nosqlbench.adapter.cqld4.optypes;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
|
||||
|
||||
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
||||
|
||||
private final BoundStatement stmt;
|
||||
|
||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
||||
super(session, maxPages, retryReplace, maxLwtRetries, processors);
|
||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages,
|
||||
boolean retryReplace, int maxLwtRetries, RSProcessors processors, CqlOpMetrics metrics) {
|
||||
super(session, maxPages, retryReplace, maxLwtRetries, processors, metrics);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
@@ -37,4 +39,5 @@ public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
||||
public String getQueryString() {
|
||||
return stmt.getPreparedStatement().getQuery();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -19,12 +19,13 @@ package io.nosqlbench.adapter.cqld4.optypes;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
|
||||
|
||||
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
|
||||
private final SimpleStatement stmt;
|
||||
|
||||
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries) {
|
||||
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors());
|
||||
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, CqlOpMetrics metrics) {
|
||||
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors(), metrics);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4.processors;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
|
||||
import io.nosqlbench.virtdata.core.templates.CapturePoint;
|
||||
@@ -32,7 +32,7 @@ public class CqlFieldCaptureProcessor implements ResultSetProcessor {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(long cycle, ResultSet container) {
|
||||
public void start(long cycle, AsyncResultSet container) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.cqld4.processors;
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
|
||||
|
||||
@@ -28,12 +28,11 @@ import java.util.ArrayList;
|
||||
public class RSIterableCapture implements ResultSetProcessor {
|
||||
|
||||
private long cycle;
|
||||
private ArrayList<Row> rows = new ArrayList<>();
|
||||
private final ArrayList<Row> rows = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void start(long cycle, ResultSet container) {
|
||||
public void start(long cycle, AsyncResultSet container) {
|
||||
this.cycle = cycle;
|
||||
rows = new ArrayList<Row>(container.getAvailableWithoutFetching());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user