allow describe_index to poll for completion

This commit is contained in:
Jonathan Shook 2024-03-26 18:26:56 -05:00
parent b194f5ee02
commit 0ba6427f15
15 changed files with 225 additions and 14 deletions

View File

@ -1,12 +1,12 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="index_build_progress local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="await_index local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.index_build_progress dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live index.await_index dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="drop local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="drop local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="load_collection local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="load_collection local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="rampup local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="rampup local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="rampup_batch 100x local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="rampup_batch 100x local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="rampup_batch 2x local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="rampup_batch 2x local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="schema local" type="JarApplication" folderName="milvus_vectors">
<configuration default="false" name="schema local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="schema_index local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.schema_index milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="search_and_verify threads=100 local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.search_and_verify dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; search_threads=100 search_cycles=100k milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.exceptions;
import io.milvus.param.index.DescribeIndexParam;
import io.nosqlbench.adapter.milvus.ops.MilvusDescribeIndexOp;
import java.util.List;
public class MilvusIndexingIncompleteError extends RuntimeException {
private final DescribeIndexParam milvusDescribeIndexOp;
private final int tried;
private final List<MilvusDescribeIndexOp.IndexStats> stats;
public MilvusIndexingIncompleteError(DescribeIndexParam milvusDescribeIndexOp, int tried, List<MilvusDescribeIndexOp.IndexStats> stats) {
this.milvusDescribeIndexOp = milvusDescribeIndexOp;
this.tried = tried;
this.stats = stats;
}
@Override
public String getMessage() {
return super.getMessage() + ": "
+ "tries:" + tried + "/" + tried
+ ", index:" + milvusDescribeIndexOp.getIndexName()
+ ", database:" + milvusDescribeIndexOp.getDatabaseName()
+ ", collection:" + milvusDescribeIndexOp.getCollectionName();
}
}

View File

@ -18,7 +18,6 @@ package io.nosqlbench.adapter.milvus.opdispensers;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.index.DescribeIndexParam;
import io.milvus.param.partition.CreatePartitionParam;
import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
import io.nosqlbench.adapter.milvus.ops.MilvusBaseOp;
import io.nosqlbench.adapter.milvus.ops.MilvusDescribeIndexOp;
@ -29,6 +28,9 @@ import java.util.function.LongFunction;
public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<DescribeIndexParam> {
private LongFunction<Boolean> doAwaitIndexFunction;
private LongFunction<Integer> awaitIndexTriesFunction;
public MilvusDescribeIndexOpDispenser(MilvusDriverAdapter adapter,
ParsedOp op,
LongFunction<String> targetFunction) {
@ -48,6 +50,9 @@ public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<Descri
ebF = op.enhanceFunc(ebF,List.of("database_name","database"),String.class,
DescribeIndexParam.Builder::withDatabaseName);
this.doAwaitIndexFunction = op.getAsFunctionOr("await_index", false);
this.awaitIndexTriesFunction = op.getAsFunctionOr("await_index_tries", 100);
final LongFunction<DescribeIndexParam.Builder> lastF = ebF;
final LongFunction<DescribeIndexParam> collectionParamF = l -> lastF.apply(l).build();
return collectionParamF;
@ -60,6 +65,11 @@ public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<Descri
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusDescribeIndexOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusDescribeIndexOp(
clientF.apply(l),
paramF.apply(l),
doAwaitIndexFunction.apply(l),
awaitIndexTriesFunction.apply(l)
);
}
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -50,6 +51,16 @@ public abstract class MilvusBaseOp<T> implements CycleOp<Object> {
try {
Object result = applyOp(value);
if (result instanceof R<?> r) {
var error = r.getException();
if (error!=null) {
throw error;
}
} else {
logger.warn("Op '" + this.toString() + "' did not return a Result 'R' type." +
" Exception handling will be bypassed"
);
}
return result;
} catch (Exception e) {
if (e instanceof RuntimeException rte) {

View File

@ -17,15 +17,88 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DescribeIndexResponse;
import io.milvus.grpc.IndexDescription;
import io.milvus.param.R;
import io.milvus.param.index.DescribeIndexParam;
import io.nosqlbench.adapter.milvus.exceptions.MilvusIndexingIncompleteError;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.OpGenerator;
public class MilvusDescribeIndexOp extends MilvusBaseOp<DescribeIndexParam> {
public MilvusDescribeIndexOp(MilvusServiceClient client, DescribeIndexParam request) {
import java.util.ArrayList;
import java.util.List;
public class MilvusDescribeIndexOp extends MilvusBaseOp<DescribeIndexParam> implements OpGenerator {
private final boolean doPollTillIndexed;
private final int awaitIndexTries;
private int tried = 0;
private MilvusDescribeIndexOp nextOp;
private long lastAttemptAt=0L;
public MilvusDescribeIndexOp(
MilvusServiceClient client,
DescribeIndexParam request,
boolean doPollTillIndexed,
int awaitIndexTries
) {
super(client, request);
this.doPollTillIndexed = doPollTillIndexed;
this.awaitIndexTries = awaitIndexTries;
}
@Override
public Object applyOp(long value) {
return client.describeIndex(request);
long attemptAt = System.currentTimeMillis();
long gap = attemptAt - lastAttemptAt;
if (gap<500) {
logger.warn("You are polling the state of indexes with an interval of only " + gap + "ms.");
}
lastAttemptAt=attemptAt;
R<DescribeIndexResponse> describeIndexResponseR = client.describeIndex(request);
tried++;
DescribeIndexResponse data = describeIndexResponseR.getData();
if (doPollTillIndexed) {
List<IndexStats> stats = getIndexStats(data);
int maxpct = stats.stream().mapToInt(IndexStats::percent).max().orElse(100);
if (maxpct == 100) {
logger.info("indexing percent at " + maxpct + " on try " + tried);
this.nextOp=null;
} else if (awaitIndexTries<tried) {
logger.info("indexing percent at " + maxpct + " on try " + tried + ", retrying");
this.nextOp = this;
} else {
logger.info("indexing percent at " + maxpct + " on try " + tried + ", throwing error");
throw new MilvusIndexingIncompleteError(request, tried, stats);
}
}
return describeIndexResponseR;
}
private List<IndexStats> getIndexStats(DescribeIndexResponse data) {
var stats = new ArrayList<IndexStats>();
for (IndexDescription desc : data.getIndexDescriptionsList()) {
stats.add(new IndexStats(desc.getIndexName(),desc.getIndexedRows(),desc.getPendingIndexRows()));
}
return stats;
}
public static final record IndexStats(
String index_name,
long indexed_rows,
long pending_rows
) {
public int percent() {
if (pending_rows==0) {
return 100;
}
return (int) (100.0d*((double)indexed_rows/(double)(indexed_rows+pending_rows)));
}
}
@Override
public Op getNextOp() {
return nextOp;
}
}

View File

@ -17,6 +17,8 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.LoadCollectionParam;
public class MilvusLoadCollectionOp extends MilvusBaseOp<LoadCollectionParam> {
@ -26,6 +28,7 @@ public class MilvusLoadCollectionOp extends MilvusBaseOp<LoadCollectionParam> {
@Override
public Object applyOp(long value) {
return client.loadCollection(request);
R<RpcStatus> rpcStatusR = client.loadCollection(request);
return rpcStatusR;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.resultwrappers;
import io.milvus.grpc.GetIndexBuildProgressResponse;
public class MVGetIndexBuildProgressRespones {
private final GetIndexBuildProgressResponse r;
public MVGetIndexBuildProgressRespones(GetIndexBuildProgressResponse r) {
this.r = r;
}
public int getPercent() {
if (getTotalRows()==getIndexedRows()) {
return 100;
}
double ratio = (double) getIndexedRows() / (double) getTotalRows();
return (int) (ratio*100.0d);
}
public long getTotalRows() {
return r.getTotalRows();
}
public long getIndexedRows() {
return r.getIndexedRows();
}
}