mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
additional app logic to poll and verify milvus state becuase built-in sync ops are not reliable
This commit is contained in:
parent
1319f9842d
commit
8f93084b5d
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Copyright (c) 2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.milvus.exceptions;
|
||||
|
||||
import io.milvus.grpc.GetLoadStateResponse;
|
||||
import io.milvus.param.R;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public class MilvusAwaitStateIncompleteError extends RuntimeException {
|
||||
private final R<GetLoadStateResponse> loadState;
|
||||
private final Duration timeout;
|
||||
private final String timeSummary;
|
||||
|
||||
public MilvusAwaitStateIncompleteError(R<GetLoadStateResponse> loadState, Duration timeout, String timeSummary) {
|
||||
this.loadState = loadState;
|
||||
this.timeout = timeout;
|
||||
this.timeSummary = timeSummary;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessage() {
|
||||
return super.getMessage() + ": at time " +timeSummary;
|
||||
}
|
||||
}
|
@ -24,9 +24,9 @@ import java.util.List;
|
||||
public class MilvusIndexingIncompleteError extends RuntimeException {
|
||||
private final DescribeIndexParam milvusDescribeIndexOp;
|
||||
private final int tried;
|
||||
private final List<MilvusDescribeIndexOp.IndexStats> stats;
|
||||
private final List<MilvusDescribeIndexOp.IndexStat> stats;
|
||||
|
||||
public MilvusIndexingIncompleteError(DescribeIndexParam milvusDescribeIndexOp, int tried, List<MilvusDescribeIndexOp.IndexStats> stats) {
|
||||
public MilvusIndexingIncompleteError(DescribeIndexParam milvusDescribeIndexOp, int tried, List<MilvusDescribeIndexOp.IndexStat> stats) {
|
||||
this.milvusDescribeIndexOp = milvusDescribeIndexOp;
|
||||
this.tried = tried;
|
||||
this.stats = stats;
|
||||
|
@ -23,18 +23,26 @@ import io.nosqlbench.adapter.milvus.ops.MilvusBaseOp;
|
||||
import io.nosqlbench.adapter.milvus.ops.MilvusDescribeIndexOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.List;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<DescribeIndexParam> {
|
||||
|
||||
private LongFunction<Boolean> doAwaitIndexFunction;
|
||||
private LongFunction<Integer> awaitIndexTriesFunction;
|
||||
private Duration awaitTimeout = Duration.ZERO;
|
||||
private Duration awaitInterval = Duration.of(10, ChronoUnit.SECONDS);
|
||||
|
||||
public MilvusDescribeIndexOpDispenser(MilvusDriverAdapter adapter,
|
||||
ParsedOp op,
|
||||
LongFunction<String> targetFunction) {
|
||||
super(adapter, op, targetFunction);
|
||||
|
||||
op.getOptionalStaticValue("await_timeout", Number.class)
|
||||
.map(Number::doubleValue)
|
||||
.ifPresent(v->this.awaitTimeout = Duration.of((long)(v*1000), ChronoUnit.MILLIS));
|
||||
op.getOptionalStaticValue("await_interval", Number.class)
|
||||
.map(Number::doubleValue).ifPresent(v->this.awaitInterval =Duration.of((long)(v*1000),ChronoUnit.MILLIS));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -50,8 +58,6 @@ public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<Descri
|
||||
ebF = op.enhanceFunc(ebF,List.of("database_name","database"),String.class,
|
||||
DescribeIndexParam.Builder::withDatabaseName);
|
||||
|
||||
this.doAwaitIndexFunction = op.getAsFunctionOr("await_index", false);
|
||||
this.awaitIndexTriesFunction = op.getAsFunctionOr("await_index_tries", 100);
|
||||
|
||||
final LongFunction<DescribeIndexParam.Builder> lastF = ebF;
|
||||
final LongFunction<DescribeIndexParam> collectionParamF = l -> lastF.apply(l).build();
|
||||
@ -68,8 +74,8 @@ public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<Descri
|
||||
return l -> new MilvusDescribeIndexOp(
|
||||
clientF.apply(l),
|
||||
paramF.apply(l),
|
||||
doAwaitIndexFunction.apply(l),
|
||||
awaitIndexTriesFunction.apply(l)
|
||||
awaitTimeout,
|
||||
awaitInterval
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -17,23 +17,49 @@
|
||||
package io.nosqlbench.adapter.milvus.opdispensers;
|
||||
|
||||
import io.milvus.client.MilvusServiceClient;
|
||||
import io.milvus.grpc.LoadState;
|
||||
import io.milvus.param.collection.GetLoadStateParam;
|
||||
import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
|
||||
import io.nosqlbench.adapter.milvus.MilvusAdapterUtils;
|
||||
import io.nosqlbench.adapter.milvus.ops.MilvusBaseOp;
|
||||
import io.nosqlbench.adapter.milvus.ops.MilvusGetLoadStateOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoadStateParam> {
|
||||
|
||||
private Duration awaitTimeout = Duration.ZERO;
|
||||
private Duration awaitInterval = Duration.of(10, ChronoUnit.SECONDS);
|
||||
private LoadState awaitState = LoadState.UNRECOGNIZED;
|
||||
|
||||
public MilvusGetLoadStateOpDispenser(MilvusDriverAdapter adapter,
|
||||
ParsedOp op,
|
||||
LongFunction<String> targetFunction) {
|
||||
super(adapter, op, targetFunction);
|
||||
op.getOptionalStaticValue("await_timeout", Number.class)
|
||||
.map(Number::doubleValue)
|
||||
.ifPresent(v->this.awaitTimeout=Duration.of((long)(v*1000),ChronoUnit.MILLIS));
|
||||
op.getOptionalStaticValue("await_interval", Number.class)
|
||||
.map(Number::doubleValue).ifPresent(v->this.awaitInterval=Duration.of((long)(v*1000),ChronoUnit.MILLIS));
|
||||
op.getOptionalStaticValue("await_state", String.class).ifPresent(s -> {
|
||||
var spec = s.toLowerCase();
|
||||
for (LoadState value : LoadState.values()) {
|
||||
if (value.name().toLowerCase().equals(spec) || value.name().toLowerCase().equals("loadstate" + spec)) {
|
||||
this.awaitState = value;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (this.awaitState == null) {
|
||||
throw new OpConfigError("Unrecognizable load state to await: " + spec);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -44,7 +70,7 @@ public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoad
|
||||
) {
|
||||
LongFunction<GetLoadStateParam.Builder> ebF =
|
||||
l -> GetLoadStateParam.newBuilder().withCollectionName(targetF.apply(l));
|
||||
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name","database"),String.class,
|
||||
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name", "database"), String.class,
|
||||
GetLoadStateParam.Builder::withDatabaseName);
|
||||
|
||||
Optional<LongFunction<String>> partitionsF = op.getAsOptionalFunction("partition_name", String.class);
|
||||
@ -54,6 +80,8 @@ public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoad
|
||||
ebF = l -> finalEbF.apply(l).withPartitionNames(MilvusAdapterUtils.splitNames(pfunc.apply(l)));
|
||||
}
|
||||
|
||||
|
||||
|
||||
final LongFunction<GetLoadStateParam.Builder> lastF = ebF;
|
||||
return l -> lastF.apply(l).build();
|
||||
}
|
||||
@ -65,6 +93,12 @@ public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoad
|
||||
ParsedOp op,
|
||||
LongFunction<String> targetF
|
||||
) {
|
||||
return l -> new MilvusGetLoadStateOp(clientF.apply(l),paramF.apply(l));
|
||||
return l -> new MilvusGetLoadStateOp(
|
||||
clientF.apply(l),
|
||||
paramF.apply(l),
|
||||
this.awaitState,
|
||||
this.awaitTimeout,
|
||||
this.awaitInterval
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
package io.nosqlbench.adapter.milvus.ops;
|
||||
|
||||
import io.milvus.client.MilvusServiceClient;
|
||||
import io.milvus.grpc.DescribeCollectionResponse;
|
||||
import io.milvus.param.R;
|
||||
import io.milvus.param.collection.DescribeCollectionParam;
|
||||
|
||||
public class MilvusDescribeCollectionOp extends MilvusBaseOp<DescribeCollectionParam> {
|
||||
@ -26,6 +28,7 @@ public class MilvusDescribeCollectionOp extends MilvusBaseOp<DescribeCollectionP
|
||||
|
||||
@Override
|
||||
public Object applyOp(long value) {
|
||||
return client.describeCollection(request);
|
||||
R<DescribeCollectionResponse> describeCollectionResponseR = client.describeCollection(request);
|
||||
return describeCollectionResponseR;
|
||||
}
|
||||
}
|
||||
|
@ -21,72 +21,70 @@ import io.milvus.grpc.DescribeIndexResponse;
|
||||
import io.milvus.grpc.IndexDescription;
|
||||
import io.milvus.param.R;
|
||||
import io.milvus.param.index.DescribeIndexParam;
|
||||
import io.nosqlbench.adapter.milvus.exceptions.MilvusIndexingIncompleteError;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.OpGenerator;
|
||||
import io.nosqlbench.adapters.api.scheduling.TimeoutPredicate;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
public class MilvusDescribeIndexOp extends MilvusBaseOp<DescribeIndexParam> implements OpGenerator {
|
||||
private final boolean doPollTillIndexed;
|
||||
private final int awaitIndexTries;
|
||||
private int tried = 0;
|
||||
private final Duration timeout;
|
||||
private final Duration interval;
|
||||
private final TimeoutPredicate<Integer> timeoutPredicate;
|
||||
private MilvusDescribeIndexOp nextOp;
|
||||
private long lastAttemptAt = 0L;
|
||||
|
||||
public MilvusDescribeIndexOp(
|
||||
MilvusServiceClient client,
|
||||
DescribeIndexParam request,
|
||||
boolean doPollTillIndexed,
|
||||
int awaitIndexTries
|
||||
Duration timeout,
|
||||
Duration interval
|
||||
) {
|
||||
super(client, request);
|
||||
this.doPollTillIndexed = doPollTillIndexed;
|
||||
this.awaitIndexTries = awaitIndexTries;
|
||||
this.timeout = timeout;
|
||||
this.interval = interval;
|
||||
this.timeoutPredicate = TimeoutPredicate.of(p -> p>=100, timeout, interval, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object applyOp(long value) {
|
||||
long attemptAt = System.currentTimeMillis();
|
||||
long gap = attemptAt - lastAttemptAt;
|
||||
if (gap < 500) {
|
||||
logger.warn("You are polling index state at " + gap + "ms interval. Forcing 1S delay.");
|
||||
LockSupport.parkNanos(1_000_000_000L);
|
||||
}
|
||||
lastAttemptAt = attemptAt;
|
||||
nextOp = null;
|
||||
timeoutPredicate.blockUntilNextInterval();
|
||||
|
||||
R<DescribeIndexResponse> describeIndexResponseR = client.describeIndex(request);
|
||||
tried++;
|
||||
DescribeIndexResponse data = describeIndexResponseR.getData();
|
||||
|
||||
if (doPollTillIndexed) {
|
||||
this.nextOp = null;
|
||||
List<IndexStats> stats = getIndexStats(data);
|
||||
int maxpct = stats.stream().mapToInt(IndexStats::percent).max().orElse(100);
|
||||
if (maxpct < 100 && tried < awaitIndexTries) {
|
||||
logger.info("indexing at " + maxpct + "% on try " + tried + "/" + awaitIndexTries + ", retrying");
|
||||
this.nextOp = this;
|
||||
} else if (maxpct >= 100) {
|
||||
logger.info("indexing at " + maxpct + "% on try " + tried + "/" + awaitIndexTries + ", complete");
|
||||
} else { // tried >= awaitIndexTries
|
||||
logger.info("indexing at " + maxpct + " on try " + tried + "/" + awaitIndexTries + ", throwing error");
|
||||
throw new MilvusIndexingIncompleteError(request, tried, stats);
|
||||
}
|
||||
TimeoutPredicate.Result<Integer> result = timeoutPredicate.test(getIndexStats(data).percent());
|
||||
String message = result.status().name() + " await state " + result.value() + " at time " + result.timeSummary();
|
||||
logger.info(message);
|
||||
|
||||
if (result.isPending()) {
|
||||
this.nextOp=this;
|
||||
}
|
||||
|
||||
return describeIndexResponseR;
|
||||
}
|
||||
|
||||
private List<IndexStats> getIndexStats(DescribeIndexResponse data) {
|
||||
var stats = new ArrayList<IndexStats>();
|
||||
private IndexStats getIndexStats(DescribeIndexResponse data) {
|
||||
var stats = new ArrayList<IndexStat>();
|
||||
for (IndexDescription desc : data.getIndexDescriptionsList()) {
|
||||
stats.add(new IndexStats(desc.getIndexName(), desc.getIndexedRows(), desc.getPendingIndexRows()));
|
||||
stats.add(new IndexStat(desc.getIndexName(), desc.getIndexedRows(), desc.getPendingIndexRows()));
|
||||
}
|
||||
return stats;
|
||||
return new IndexStats(stats);
|
||||
}
|
||||
|
||||
public static final record IndexStats(
|
||||
public static class IndexStats extends ArrayList<IndexStat> {
|
||||
public IndexStats(List<IndexStat> stats) {
|
||||
super(stats);
|
||||
}
|
||||
|
||||
public int percent() {
|
||||
return stream().mapToInt(IndexStat::percent).max().orElse(0);
|
||||
}
|
||||
}
|
||||
public static final record IndexStat(
|
||||
String index_name,
|
||||
long indexed_rows,
|
||||
long pending_rows
|
||||
|
@ -17,15 +17,55 @@
|
||||
package io.nosqlbench.adapter.milvus.ops;
|
||||
|
||||
import io.milvus.client.MilvusServiceClient;
|
||||
import io.milvus.grpc.GetLoadStateResponse;
|
||||
import io.milvus.grpc.LoadState;
|
||||
import io.milvus.param.R;
|
||||
import io.milvus.param.collection.GetLoadStateParam;
|
||||
import io.nosqlbench.adapter.milvus.exceptions.MilvusAwaitStateIncompleteError;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.OpGenerator;
|
||||
import io.nosqlbench.adapters.api.scheduling.TimeoutPredicate;
|
||||
|
||||
public class MilvusGetLoadStateOp extends MilvusBaseOp<GetLoadStateParam> {
|
||||
public MilvusGetLoadStateOp(MilvusServiceClient client, GetLoadStateParam request) {
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
public class MilvusGetLoadStateOp extends MilvusBaseOp<GetLoadStateParam> implements OpGenerator {
|
||||
private final TimeoutPredicate<LoadState> timeoutPredicate;
|
||||
private int tried;
|
||||
private MilvusGetLoadStateOp nextOp;
|
||||
private long lastAttemptAt = 0L;
|
||||
|
||||
public MilvusGetLoadStateOp(
|
||||
MilvusServiceClient client,
|
||||
GetLoadStateParam request,
|
||||
LoadState awaitState,
|
||||
Duration timeout,
|
||||
Duration interval
|
||||
) {
|
||||
super(client, request);
|
||||
this.timeoutPredicate = TimeoutPredicate.of(s -> s==awaitState, timeout, interval, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object applyOp(long value) {
|
||||
return client.getLoadState(request);
|
||||
this.nextOp = null;
|
||||
timeoutPredicate.blockUntilNextInterval();
|
||||
R<GetLoadStateResponse> getLoadStateResponse = client.getLoadState(request);
|
||||
TimeoutPredicate.Result<LoadState> result = timeoutPredicate.test(getLoadStateResponse.getData().getState());
|
||||
|
||||
String message = result.status().name() + " await state " + result.value() + " at time " + result.timeSummary();
|
||||
logger.info(message);
|
||||
|
||||
if (result.status()== TimeoutPredicate.Status.pending) {
|
||||
nextOp=this;
|
||||
}
|
||||
|
||||
return getLoadStateResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Op getNextOp() {
|
||||
return this.nextOp;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,174 @@
|
||||
/*
|
||||
* Copyright (c) 2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapters.api.scheduling;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class TimeoutPredicate<T> {
|
||||
private final static Logger logger = LogManager.getLogger(TimeoutPredicate.class);
|
||||
|
||||
private final Supplier<T> source;
|
||||
private final Predicate<T> predicate;
|
||||
private final long timeoutNanos;
|
||||
private final long blockingNanos;
|
||||
private final boolean rethrow;
|
||||
private long pulseTime = 0L;
|
||||
private long startNanos;
|
||||
private long endNanos;
|
||||
|
||||
public static <PT> TimeoutPredicate<PT> of(
|
||||
Predicate<PT> o,
|
||||
Duration timeout,
|
||||
Duration interval,
|
||||
boolean b
|
||||
) {
|
||||
return new TimeoutPredicate<>(o, timeout, interval, b);
|
||||
}
|
||||
|
||||
public static <PT> TimeoutPredicate<PT> of(
|
||||
Supplier<PT> source, Predicate<PT> predicate, Duration timeout, Duration interval, boolean rethrow
|
||||
) {
|
||||
return new TimeoutPredicate<>(source, predicate, timeout, interval, rethrow);
|
||||
}
|
||||
|
||||
private TimeoutPredicate(
|
||||
Predicate<T> predicate,
|
||||
Duration timeout,
|
||||
Duration minBlockingInterval,
|
||||
boolean rethrow
|
||||
) {
|
||||
this(null, predicate, timeout, minBlockingInterval, rethrow);
|
||||
}
|
||||
|
||||
private TimeoutPredicate(
|
||||
Supplier<T> source,
|
||||
Predicate<T> predicate,
|
||||
Duration timeout,
|
||||
Duration minBlockingInterval,
|
||||
boolean rethrow
|
||||
) {
|
||||
this.source = source;
|
||||
this.predicate = Objects.requireNonNull(predicate);
|
||||
|
||||
timeoutNanos = Objects.requireNonNull(timeout).toNanos();
|
||||
blockingNanos = Objects.requireNonNull(minBlockingInterval).toNanos();
|
||||
startNanos = System.nanoTime();
|
||||
endNanos = startNanos + timeoutNanos;
|
||||
this.rethrow = rethrow;
|
||||
}
|
||||
|
||||
|
||||
public Result<T> test(T value) {
|
||||
long totalNanos = blockUntilNextInterval();
|
||||
|
||||
boolean isComplete = false;
|
||||
try {
|
||||
isComplete = predicate.test(value);
|
||||
long remaining = endNanos - pulseTime;
|
||||
if (isComplete) {
|
||||
return new Result<>(value, Status.complete, totalNanos, timeoutNanos, null);
|
||||
} else if (remaining > 0) {
|
||||
// System.out.println("pulse:" + pulseTime + " end:" + endNanos + " remaining:" + remaining);
|
||||
return new Result<>(value, Status.pending, totalNanos, timeoutNanos, null);
|
||||
} else {
|
||||
// System.out.println("pulse:" + pulseTime + " end:" + endNanos + " remaining:" + remaining);
|
||||
return new Result<>(value, Status.incomplete, totalNanos, timeoutNanos, null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("exception caught while evaluating timeout predicate:" + e, e);
|
||||
if (rethrow) throw new RuntimeException(e);
|
||||
return new Result<>(value, Status.error, totalNanos, timeoutNanos, new RuntimeException(e));
|
||||
}
|
||||
}
|
||||
|
||||
public Result<T> test() {
|
||||
Objects.requireNonNull(source);
|
||||
T value = source.get();
|
||||
return test(value);
|
||||
}
|
||||
|
||||
public long blockUntilNextInterval() {
|
||||
if (pulseTime == 0L) { // first try has no delay
|
||||
pulseTime = System.nanoTime();
|
||||
return 0L;
|
||||
}
|
||||
|
||||
long now = System.nanoTime();
|
||||
long targetNanos = Math.max(now, Math.min(endNanos, pulseTime + blockingNanos));
|
||||
while (now <= targetNanos) {
|
||||
LockSupport.parkNanos(targetNanos - now);
|
||||
now = System.nanoTime();
|
||||
}
|
||||
pulseTime = now;
|
||||
long currentTime = pulseTime - startNanos;
|
||||
return currentTime;
|
||||
}
|
||||
|
||||
public static enum Status {
|
||||
complete,
|
||||
pending,
|
||||
incomplete,
|
||||
error
|
||||
}
|
||||
|
||||
public static record Result<T>(
|
||||
T value,
|
||||
Status status,
|
||||
long duration_ns,
|
||||
long timeout_ns,
|
||||
RuntimeException exception
|
||||
) {
|
||||
public String timeSummary() {
|
||||
return statusDuration() + " / "
|
||||
+ timeoutDuration();
|
||||
}
|
||||
|
||||
public Duration statusDuration() {
|
||||
return Duration.of(duration_ns - (duration_ns % 1_000_000), ChronoUnit.NANOS);
|
||||
}
|
||||
|
||||
public Duration timeoutDuration() {
|
||||
return Duration.of(timeout_ns, ChronoUnit.NANOS);
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return status==Status.complete;
|
||||
}
|
||||
public boolean isIncomplete() {
|
||||
return status==Status.incomplete;
|
||||
}
|
||||
public boolean isPending() {
|
||||
return status==Status.pending;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "timeout:" + Duration.of(this.endNanos - this.startNanos, ChronoUnit.NANOS)
|
||||
+ ", current:" + Duration.of((this.endNanos - this.pulseTime), ChronoUnit.NANOS)
|
||||
+ ", interval:" + Duration.of(this.blockingNanos, ChronoUnit.NANOS);
|
||||
}
|
||||
}
|
@ -118,6 +118,7 @@ public class StrInterpolator implements Function<String, String> {
|
||||
|
||||
accesses.put(key,value);
|
||||
logger.debug("Template parameter '" + key + "' applied as '" + value + "'");
|
||||
// TODO summarize these to how many times
|
||||
return value;
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Copyright (c) 2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapters.api.scheduling;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class TimeoutPredicateTest {
|
||||
|
||||
@Test
|
||||
public void testNeverCompletablePreciate() {
|
||||
int interval=10;
|
||||
int timeout=500;
|
||||
|
||||
TimeoutPredicate<Boolean> wontMakeIt = TimeoutPredicate.of(
|
||||
()->false,
|
||||
l -> l,
|
||||
Duration.ofMillis(timeout),
|
||||
Duration.ofMillis(interval),
|
||||
true
|
||||
);
|
||||
|
||||
TimeoutPredicate.Result<Boolean> resultNow = wontMakeIt.test();
|
||||
assertThat(resultNow.duration_ns()).isEqualTo(0L);
|
||||
assertThat(resultNow.value()).isFalse();
|
||||
assertThat(resultNow.status()).isEqualTo(TimeoutPredicate.Status.pending);
|
||||
|
||||
resultNow = wontMakeIt.test();
|
||||
assertThat(resultNow.duration_ns()).isBetween(10*1_000_000L,50*1_000_000L);
|
||||
assertThat(resultNow.value()).isFalse();
|
||||
assertThat(resultNow.status()).isEqualTo(TimeoutPredicate.Status.pending);
|
||||
|
||||
while (resultNow.status()== TimeoutPredicate.Status.pending) {
|
||||
resultNow=wontMakeIt.test();
|
||||
}
|
||||
|
||||
assertThat(resultNow.status()).isEqualTo(TimeoutPredicate.Status.incomplete);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImmediatelyCompletablePreciate() {
|
||||
int interval=10;
|
||||
int timeout=5000;
|
||||
TimeoutPredicate<Boolean> canMakeIt = TimeoutPredicate.of(
|
||||
()->true,
|
||||
l -> l,
|
||||
Duration.ofMillis(timeout),
|
||||
Duration.ofMillis(interval),
|
||||
true
|
||||
);
|
||||
|
||||
TimeoutPredicate.Result<Boolean> resultNow = canMakeIt.test();
|
||||
assertThat(resultNow.duration_ns()).isEqualTo(0L);
|
||||
assertThat(resultNow.value()).isTrue();
|
||||
assertThat(resultNow.status()).isEqualTo(TimeoutPredicate.Status.complete);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventuallyCompletePredicate() {
|
||||
|
||||
int interval=250;
|
||||
int timeout=5000;
|
||||
long now = System.currentTimeMillis();
|
||||
long inASec = now+1000;
|
||||
TimeoutPredicate<Long> canMakeIt = TimeoutPredicate.of(
|
||||
System::currentTimeMillis,
|
||||
l -> l>inASec,
|
||||
Duration.ofMillis(timeout),
|
||||
Duration.ofMillis(interval),
|
||||
true
|
||||
);
|
||||
|
||||
TimeoutPredicate.Result<Long> result = canMakeIt.test();
|
||||
System.out.println(result);
|
||||
|
||||
while (result.status()== TimeoutPredicate.Status.pending) {
|
||||
// canMakeIt.blockUntilNextInterval();
|
||||
result=canMakeIt.test();
|
||||
System.out.println(canMakeIt);
|
||||
System.out.println(result);
|
||||
}
|
||||
|
||||
assertThat(result.status()).isEqualTo(TimeoutPredicate.Status.complete);
|
||||
}
|
||||
|
||||
}
|
@ -121,7 +121,8 @@ public class NBCLIOptions {
|
||||
private static final String GRAPHITE_LOG_LEVEL = "--graphite-log-level";
|
||||
private static final String REPORT_CSV_TO = "--report-csv-to";
|
||||
private static final String REPORT_SUMMARY_TO = "--report-summary-to";
|
||||
private static final String REPORT_SUMMARY_TO_DEFAULT = "stdout:60,_LOGS_/_SESSION__summary.txt";
|
||||
private static final String SUMMARY = "--summary";
|
||||
private static final String REPORT_SUMMARY_TO_DEFAULT = "_LOGS_/_SESSION__summary.txt";
|
||||
private static final String PROGRESS = "--progress";
|
||||
private static final String WITH_LOGGING_PATTERN = "--with-logging-pattern";
|
||||
private static final String LOGGING_PATTERN = "--logging-pattern";
|
||||
@ -203,7 +204,7 @@ public class NBCLIOptions {
|
||||
private boolean wantsConsoleMetrics = true;
|
||||
private String annotateLabelSpec = "";
|
||||
private String metricsLabelSpec = "";
|
||||
private String wantsToCatResource ="";
|
||||
private String wantsToCatResource = "";
|
||||
private long heartbeatIntervalMs = 10000;
|
||||
|
||||
public boolean wantsLoggedMetrics() {
|
||||
@ -271,7 +272,7 @@ public class NBCLIOptions {
|
||||
}
|
||||
|
||||
public boolean wantsToCatResource() {
|
||||
return this.wantsToCatResource!=null && !this.wantsToCatResource.isEmpty();
|
||||
return this.wantsToCatResource != null && !this.wantsToCatResource.isEmpty();
|
||||
}
|
||||
|
||||
public enum Mode {
|
||||
@ -596,6 +597,10 @@ public class NBCLIOptions {
|
||||
arglist.removeFirst();
|
||||
this.reportCsvTo = arglist.removeFirst();
|
||||
break;
|
||||
case NBCLIOptions.SUMMARY:
|
||||
arglist.removeFirst();
|
||||
this.reportSummaryTo = "stdout:0";
|
||||
break;
|
||||
case NBCLIOptions.REPORT_SUMMARY_TO:
|
||||
arglist.removeFirst();
|
||||
this.reportSummaryTo = this.readWordOrThrow(arglist, "report summary file");
|
||||
@ -644,7 +649,7 @@ public class NBCLIOptions {
|
||||
case HEARTBEAT_MILLIS:
|
||||
arglist.removeFirst();
|
||||
this.heartbeatIntervalMs =
|
||||
Long.parseLong(this.readWordOrThrow(arglist, "heartbeat interval in ms"));
|
||||
Long.parseLong(this.readWordOrThrow(arglist, "heartbeat interval in ms"));
|
||||
break;
|
||||
default:
|
||||
nonincludes.addLast(arglist.removeFirst());
|
||||
@ -673,8 +678,7 @@ public class NBCLIOptions {
|
||||
"""
|
||||
.replaceAll("ARG", cmdParam)
|
||||
.replaceAll("PROG", "nb5")
|
||||
.replaceAll("INCLUDES", String.join(",", wantsIncludes()))
|
||||
;
|
||||
.replaceAll("INCLUDES", String.join(",", wantsIncludes()));
|
||||
|
||||
final String debugMessage = """
|
||||
|
||||
@ -683,7 +687,7 @@ public class NBCLIOptions {
|
||||
COMMANDSTREAM
|
||||
"""
|
||||
.replaceAll("COMMANDSTREAM",
|
||||
String.join(" ",arglist));
|
||||
String.join(" ", arglist));
|
||||
if (consoleLevel.isGreaterOrEqualTo(NBLogLevel.INFO)) {
|
||||
System.out.println(debugMessage);
|
||||
}
|
||||
|
@ -226,13 +226,21 @@ The classic metrics logging format is used to report results into the logfile fo
|
||||
This format is not generally human-friendly, so a better summary report is provided by default to
|
||||
the console and/or a specified summary file by default.
|
||||
|
||||
By default, summaries are always reported to a summary file in the logs directory.
|
||||
It is highly recommended that you use this form in general. Users are often more interested
|
||||
in seeing play-by-play high-level details on console, and more human-readable forms of metrics
|
||||
summaries are easily created with other options.
|
||||
|
||||
Examples:
|
||||
|
||||
# report to auto-named summary file for every session
|
||||
--report-summary-to _LOGS_/_SESSION_.summary
|
||||
|
||||
# report to console if session ran more than 60 seconds
|
||||
--report-summary-to stdout:60
|
||||
|
||||
# report to auto-named summary file for every session
|
||||
--report-summary-to _LOGS_/_SESSION_.summary
|
||||
# simply enable reporting summary to console only, same as above
|
||||
--summary
|
||||
|
||||
# do both (the default)
|
||||
--report-summary-to stdout:60,_LOGS_/_SESSION_.summary
|
||||
|
Loading…
Reference in New Issue
Block a user