mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
commit
d0f25f5c68
@ -24,8 +24,8 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
|||||||
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
|
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
|
||||||
private final BoundStatement stmt;
|
private final BoundStatement stmt;
|
||||||
|
|
||||||
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, BoundStatement rebound, RSProcessors processors) {
|
public Cqld4CqlReboundStatement(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int lwtRetryCount, BoundStatement rebound, RSProcessors processors) {
|
||||||
super(session,maxpages,retryreplace,processors);
|
super(session,maxPages,retryReplace,maxLwtRetries,lwtRetryCount, processors);
|
||||||
this.stmt = rebound;
|
this.stmt = rebound;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,45 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.cqld4.exceptions;
|
||||||
|
|
||||||
|
|
||||||
|
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a synthetic error generated by the cql driver in NoSQLBench when
|
||||||
|
* the retryreplace option is used but the number of LWT round-trips from the driver
|
||||||
|
* is excessive. The number of LWT round trips allowed is controlled by the
|
||||||
|
* maxlwtretries op field.
|
||||||
|
*/
|
||||||
|
public class ExceededRetryReplaceException extends CqlGenericCycleException {
|
||||||
|
|
||||||
|
private final ResultSet resultSet;
|
||||||
|
private final String queryString;
|
||||||
|
private final int retries;
|
||||||
|
|
||||||
|
public ExceededRetryReplaceException(ResultSet resultSet, String queryString, int retries) {
|
||||||
|
super("After " + retries + " retries using the retryreplace option, Operation was not applied:" + queryString);
|
||||||
|
this.retries = retries;
|
||||||
|
this.resultSet = resultSet;
|
||||||
|
this.queryString = queryString;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResultSet getResultSet() {
|
||||||
|
return resultSet;
|
||||||
|
}
|
||||||
|
public String getQueryString() { return queryString; }
|
||||||
|
}
|
@ -45,12 +45,14 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
|||||||
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
|
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
|
||||||
private final LongFunction<CqlSession> sessionFunc;
|
private final LongFunction<CqlSession> sessionFunc;
|
||||||
private final boolean isRetryReplace;
|
private final boolean isRetryReplace;
|
||||||
|
private final int maxLwtRetries;
|
||||||
|
|
||||||
public Cqld4BaseOpDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op) {
|
public Cqld4BaseOpDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op) {
|
||||||
super(adapter, op);
|
super(adapter, op);
|
||||||
this.sessionFunc = sessionFunc;
|
this.sessionFunc = sessionFunc;
|
||||||
this.maxpages = op.getStaticConfigOr("maxpages", 1);
|
this.maxpages = op.getStaticConfigOr("maxpages", 1);
|
||||||
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
|
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
|
||||||
|
this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxPages() {
|
public int getMaxPages() {
|
||||||
@ -61,6 +63,11 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
|||||||
return isRetryReplace;
|
return isRetryReplace;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxLwtRetries() {
|
||||||
|
return maxLwtRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public LongFunction<CqlSession> getSessionFunc() {
|
public LongFunction<CqlSession> getSessionFunc() {
|
||||||
return sessionFunc;
|
return sessionFunc;
|
||||||
}
|
}
|
||||||
|
@ -89,6 +89,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
|
|||||||
boundStatement,
|
boundStatement,
|
||||||
getMaxPages(),
|
getMaxPages(),
|
||||||
isRetryReplace(),
|
isRetryReplace(),
|
||||||
|
getMaxLwtRetries(),
|
||||||
processors
|
processors
|
||||||
);
|
);
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
|
@ -49,7 +49,8 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
|
|||||||
getSessionFunc().apply(value),
|
getSessionFunc().apply(value),
|
||||||
(SimpleStatement) stmtFunc.apply(value),
|
(SimpleStatement) stmtFunc.apply(value),
|
||||||
getMaxPages(),
|
getMaxPages(),
|
||||||
isRetryReplace()
|
isRetryReplace(),
|
||||||
|
getMaxLwtRetries()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +46,8 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
|
|||||||
getSessionFunc().apply(value),
|
getSessionFunc().apply(value),
|
||||||
(SimpleStatement) stmtFunc.apply(value),
|
(SimpleStatement) stmtFunc.apply(value),
|
||||||
getMaxPages(),
|
getMaxPages(),
|
||||||
isRetryReplace()
|
isRetryReplace(),
|
||||||
|
getMaxLwtRetries()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,8 +24,8 @@ public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
|
|||||||
|
|
||||||
private final BatchStatement stmt;
|
private final BatchStatement stmt;
|
||||||
|
|
||||||
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace) {
|
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxPage, int maxLwtRetries, boolean retryReplace) {
|
||||||
super(session,maxpages,retryreplace,new RSProcessors());
|
super(session,maxPage,retryReplace,maxLwtRetries,new RSProcessors());
|
||||||
this.stmt = stmt;
|
this.stmt = stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.cql.Row;
|
|||||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||||
import io.nosqlbench.adapter.cqld4.*;
|
import io.nosqlbench.adapter.cqld4.*;
|
||||||
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
|
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
|
||||||
|
import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException;
|
||||||
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
|
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
|
||||||
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
|
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*;
|
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*;
|
||||||
@ -44,17 +45,29 @@ import java.util.Map;
|
|||||||
public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture, OpGenerator, OpResultSize {
|
public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture, OpGenerator, OpResultSize {
|
||||||
|
|
||||||
private final CqlSession session;
|
private final CqlSession session;
|
||||||
private final int maxpages;
|
private final int maxPages;
|
||||||
private final boolean retryreplace;
|
private final boolean retryReplace;
|
||||||
|
private final int maxLwtRetries;
|
||||||
|
private int retryReplaceCount =0;
|
||||||
|
|
||||||
private ResultSet rs;
|
private ResultSet rs;
|
||||||
private Cqld4CqlOp nextOp;
|
private Cqld4CqlOp nextOp;
|
||||||
private final RSProcessors processors;
|
private final RSProcessors processors;
|
||||||
|
|
||||||
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, RSProcessors processors) {
|
public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
this.maxpages = maxpages;
|
this.maxPages = maxPages;
|
||||||
this.retryreplace = retryreplace;
|
this.retryReplace = retryReplace;
|
||||||
|
this.maxLwtRetries =maxLwtRetries;
|
||||||
|
this.processors = processors;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) {
|
||||||
|
this.session = session;
|
||||||
|
this.maxPages = maxPages;
|
||||||
|
this.retryReplace = retryReplace;
|
||||||
|
this.maxLwtRetries =maxLwtRetries;
|
||||||
|
this.retryReplaceCount=retryRplaceCount;
|
||||||
this.processors = processors;
|
this.processors = processors;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,9 +79,13 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
|||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
|
|
||||||
if (!rs.wasApplied()) {
|
if (!rs.wasApplied()) {
|
||||||
if (!retryreplace) {
|
if (!retryReplace) {
|
||||||
throw new ChangeUnappliedCycleException(rs, getQueryString());
|
throw new ChangeUnappliedCycleException(rs, getQueryString());
|
||||||
} else {
|
} else {
|
||||||
|
retryReplaceCount++;
|
||||||
|
if (retryReplaceCount >maxLwtRetries) {
|
||||||
|
throw new ExceededRetryReplaceException(rs,getQueryString(), retryReplaceCount);
|
||||||
|
}
|
||||||
Row one = rs.one();
|
Row one = rs.one();
|
||||||
processors.buffer(one);
|
processors.buffer(one);
|
||||||
totalRows++;
|
totalRows++;
|
||||||
@ -86,8 +103,8 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
|||||||
Row row = reader.next();
|
Row row = reader.next();
|
||||||
processors.buffer(row);
|
processors.buffer(row);
|
||||||
}
|
}
|
||||||
if (pages++ > maxpages) {
|
if (pages++ > maxPages) {
|
||||||
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxpages, stmt.getPageSize());
|
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxPages, stmt.getPageSize());
|
||||||
}
|
}
|
||||||
if (rs.isFullyFetched()) {
|
if (rs.isFullyFetched()) {
|
||||||
break;
|
break;
|
||||||
@ -119,7 +136,7 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
|||||||
|
|
||||||
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
|
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
|
||||||
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
|
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
|
||||||
return new Cqld4CqlReboundStatement(session, maxpages, retryreplace, rebound, processors);
|
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,8 @@ public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
|||||||
|
|
||||||
private final BoundStatement stmt;
|
private final BoundStatement stmt;
|
||||||
|
|
||||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, RSProcessors processors) {
|
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
||||||
super(session,maxpages,retryreplace,processors);
|
super(session,maxPages,retryReplace,maxLwtRetries,processors);
|
||||||
this.stmt = stmt;
|
this.stmt = stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,8 +23,8 @@ import io.nosqlbench.adapter.cqld4.RSProcessors;
|
|||||||
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
|
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
|
||||||
private final SimpleStatement stmt;
|
private final SimpleStatement stmt;
|
||||||
|
|
||||||
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace) {
|
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries) {
|
||||||
super(session, maxpages,retryreplace, new RSProcessors());
|
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors());
|
||||||
this.stmt = stmt;
|
this.stmt = stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,6 +198,11 @@ params:
|
|||||||
# match the preconditions) in order to test LWT performance.
|
# match the preconditions) in order to test LWT performance.
|
||||||
retryreplace: true
|
retryreplace: true
|
||||||
|
|
||||||
|
# Set the number of retries allowed by the retryreplace option. This is set
|
||||||
|
# to 1 conservatively, as with the maxpages setting. This means that you will
|
||||||
|
# see an error if the first LWT retry after an unapplied change was not successful.
|
||||||
|
maxlwtretries: 1
|
||||||
|
|
||||||
## The following options are meant for advanced testing scenarios only,
|
## The following options are meant for advanced testing scenarios only,
|
||||||
## and are not generally meant to be used in typical application-level,
|
## and are not generally meant to be used in typical application-level,
|
||||||
## data mode, performance or scale testing. These expose properties
|
## data mode, performance or scale testing. These expose properties
|
||||||
|
Loading…
Reference in New Issue
Block a user