mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-12-26 08:41:05 -06:00
nosqlbench-830 limit spin-looping with cqld4 retryreplace option.
This commit is contained in:
parent
eaa8c5671e
commit
3677a60ec8
@ -24,8 +24,8 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
|
||||
private final BoundStatement stmt;
|
||||
|
||||
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, BoundStatement rebound, RSProcessors processors) {
|
||||
super(session,maxpages,retryreplace,processors);
|
||||
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, int maxlwtretries, int lwtRetryCount, BoundStatement rebound, RSProcessors processors) {
|
||||
super(session,maxpages,retryreplace,maxlwtretries,lwtRetryCount, processors);
|
||||
this.stmt = rebound;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.cqld4.exceptions;
|
||||
|
||||
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
|
||||
/**
|
||||
* This is a synthetic error generated by the cql driver in NoSQLBench when
|
||||
* the retryreplace option is used but the number of LWT round-trips from the driver
|
||||
* is excessive. The number of LWT round trips allowed is controlled by the
|
||||
* maxlwtretries op field.
|
||||
*/
|
||||
public class ExceededRetryReplaceException extends CqlGenericCycleException {
|
||||
|
||||
private final ResultSet resultSet;
|
||||
private final String queryString;
|
||||
private final int retries;
|
||||
|
||||
public ExceededRetryReplaceException(ResultSet resultSet, String queryString, int retries) {
|
||||
super("After " + retries + " retires using the retryreplace option, Operation was not applied:" + queryString);
|
||||
this.retries = retries;
|
||||
this.resultSet = resultSet;
|
||||
this.queryString = queryString;
|
||||
}
|
||||
|
||||
public ResultSet getResultSet() {
|
||||
return resultSet;
|
||||
}
|
||||
public String getQueryString() { return queryString; }
|
||||
}
|
@ -45,12 +45,14 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
||||
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final boolean isRetryReplace;
|
||||
private final int maxLwtRetries;
|
||||
|
||||
public Cqld4BaseOpDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op) {
|
||||
super(adapter, op);
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.maxpages = op.getStaticConfigOr("maxpages", 1);
|
||||
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
|
||||
this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1);
|
||||
}
|
||||
|
||||
public int getMaxPages() {
|
||||
@ -61,6 +63,11 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
||||
return isRetryReplace;
|
||||
}
|
||||
|
||||
public int getMaxLwtRetries() {
|
||||
return maxLwtRetries;
|
||||
}
|
||||
|
||||
|
||||
public LongFunction<CqlSession> getSessionFunc() {
|
||||
return sessionFunc;
|
||||
}
|
||||
|
@ -89,6 +89,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
boundStatement,
|
||||
getMaxPages(),
|
||||
isRetryReplace(),
|
||||
getMaxLwtRetries(),
|
||||
processors
|
||||
);
|
||||
} catch (Exception exception) {
|
||||
|
@ -49,7 +49,8 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
getSessionFunc().apply(value),
|
||||
(SimpleStatement) stmtFunc.apply(value),
|
||||
getMaxPages(),
|
||||
isRetryReplace()
|
||||
isRetryReplace(),
|
||||
getMaxLwtRetries()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,8 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
getSessionFunc().apply(value),
|
||||
(SimpleStatement) stmtFunc.apply(value),
|
||||
getMaxPages(),
|
||||
isRetryReplace()
|
||||
isRetryReplace(),
|
||||
getMaxLwtRetries()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -24,8 +24,8 @@ public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
|
||||
|
||||
private final BatchStatement stmt;
|
||||
|
||||
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace) {
|
||||
super(session,maxpages,retryreplace,new RSProcessors());
|
||||
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, int maxlwtretries, boolean retryreplace) {
|
||||
super(session,maxpages,retryreplace,maxlwtretries,new RSProcessors());
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.*;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*;
|
||||
@ -46,15 +47,27 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
||||
private final CqlSession session;
|
||||
private final int maxpages;
|
||||
private final boolean retryreplace;
|
||||
private final int maxLwtRetries;
|
||||
private int retryReplaceCount =0;
|
||||
|
||||
private ResultSet rs;
|
||||
private Cqld4CqlOp nextOp;
|
||||
private final RSProcessors processors;
|
||||
|
||||
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, RSProcessors processors) {
|
||||
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, int maxLwtRetries, RSProcessors processors) {
|
||||
this.session = session;
|
||||
this.maxpages = maxpages;
|
||||
this.retryreplace = retryreplace;
|
||||
this.maxLwtRetries =maxLwtRetries;
|
||||
this.processors = processors;
|
||||
}
|
||||
|
||||
protected Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) {
|
||||
this.session = session;
|
||||
this.maxpages = maxpages;
|
||||
this.retryreplace = retryreplace;
|
||||
this.maxLwtRetries =maxLwtRetries;
|
||||
this.retryReplaceCount=retryRplaceCount;
|
||||
this.processors = processors;
|
||||
}
|
||||
|
||||
@ -69,6 +82,10 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
||||
if (!retryreplace) {
|
||||
throw new ChangeUnappliedCycleException(rs, getQueryString());
|
||||
} else {
|
||||
retryReplaceCount++;
|
||||
if (retryReplaceCount >maxLwtRetries) {
|
||||
throw new ExceededRetryReplaceException(rs,getQueryString(), retryReplaceCount);
|
||||
}
|
||||
Row one = rs.one();
|
||||
processors.buffer(one);
|
||||
totalRows++;
|
||||
@ -119,7 +136,7 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
||||
|
||||
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
|
||||
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
|
||||
return new Cqld4CqlReboundStatement(session, maxpages, retryreplace, rebound, processors);
|
||||
return new Cqld4CqlReboundStatement(session, maxpages, retryreplace, maxLwtRetries, retryReplaceCount, rebound, processors);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -24,8 +24,8 @@ public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
||||
|
||||
private final BoundStatement stmt;
|
||||
|
||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, RSProcessors processors) {
|
||||
super(session,maxpages,retryreplace,processors);
|
||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, int maxLwtRetries, RSProcessors processors) {
|
||||
super(session,maxpages,retryreplace,maxLwtRetries,processors);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
|
@ -23,8 +23,8 @@ import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
|
||||
private final SimpleStatement stmt;
|
||||
|
||||
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace) {
|
||||
super(session, maxpages,retryreplace, new RSProcessors());
|
||||
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, int maxlwtretries) {
|
||||
super(session, maxpages,retryreplace, maxlwtretries, new RSProcessors());
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
|
@ -198,6 +198,11 @@ params:
|
||||
# match the preconditions) in order to test LWT performance.
|
||||
retryreplace: true
|
||||
|
||||
# Set the number of retries allowed by the retryreplace option. This is set
|
||||
# to 1 conservatively, as with the maxpages setting. This means that you will
|
||||
# see an error if the first LWT retry after an unapplied change was not successful.
|
||||
maxlwtretries: 1
|
||||
|
||||
## The following options are meant for advanced testing scenarios only,
|
||||
## and are not generally meant to be used in typical application-level,
|
||||
## data mode, performance or scale testing. These expose properties
|
||||
|
Loading…
Reference in New Issue
Block a user