partial fixes before polling up updates

This commit is contained in:
Jonathan Shook 2024-03-25 17:34:06 -05:00
parent aaa1d1dcab
commit 1d96b00824
10 changed files with 110 additions and 16 deletions

View File

@ -47,6 +47,11 @@
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.24.0</version>
</dependency>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>

View File

@ -53,7 +53,7 @@ public class MilvusOpMapper implements OpMapper<MilvusBaseOp<?>> {
"type",
"target"
);
logger.info(() -> "Using '" + typeAndTarget.enumId + "' statement form for '" + op.getName() + "'");
logger.info(() -> "Using '" + typeAndTarget.enumId + "' op type for op template '" + op.getName() + "'");
return switch (typeAndTarget.enumId) {
case drop_collection -> new MilvusDropCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);

View File

@ -16,7 +16,12 @@
package io.nosqlbench.adapter.milvus.ops;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.GetIndexBuildProgressResponse;
import io.milvus.param.R;
import io.milvus.param.index.GetIndexBuildProgressParam;
public class MilvusGetIndexBuildProgressOp extends MilvusBaseOp<GetIndexBuildProgressParam> {
@ -26,6 +31,13 @@ public class MilvusGetIndexBuildProgressOp extends MilvusBaseOp<GetIndexBuildPro
@Override
public Object applyOp(long value) {
return client.getIndexBuildProgress(request);
R<GetIndexBuildProgressResponse> indexBuildProgress = client.getIndexBuildProgress(request);
GetIndexBuildProgressResponse r = indexBuildProgress.getData();
try {
String responseJson = JsonFormat.printer().print(r);
return responseJson;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 nosqlbench
* Copyright (c) 2022-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform;
package io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 nosqlbench
* Copyright (c) 2022-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,10 +14,11 @@
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform;
package io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 nosqlbench
* Copyright (c) 2022-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform;
package io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 nosqlbench
* Copyright (c) 2022-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,10 +14,11 @@
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform;
package io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.OpGenerator;
import io.nosqlbench.adapters.api.evalctx.CycleFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Predicate;
public class PollingOp<T> implements CycleOp<T>, OpGenerator {
private final static Logger logger = LogManager.getLogger(PollingOp.class);
private final CycleOp<T> innerOp;
private final CycleFunction<Boolean> untilCondition;
private PollingOp<T> nextOp = null;
public PollingOp(CycleOp<T> innerOp, CycleFunction<Boolean> untilCondition) {
this.innerOp = innerOp;
this.untilCondition = untilCondition;
}
@Override
public synchronized T apply(long value) {
T result = this.innerOp.apply(value);
untilCondition.setVariable("result",result);
boolean conditionIsMet = untilCondition.apply(value);
if (conditionIsMet) {
onConditionMet(result);
this.nextOp=null;
} else {
this.nextOp=this;
onConditionUnmet(result);
}
return result;
}
@Override
public synchronized Op getNextOp() {
return nextOp;
}
protected void onConditionMet(T value) {
logger.debug("for op " + this + ": condition MET for result " + value);
}
protected void onConditionUnmet(T value) {
logger.debug("for op " + this + ": condition UNMET for result " + value);
}
}

View File

@ -16,8 +16,10 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapters.api.activityimpl.uniform.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.PollingOpDispenserWrapper;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -50,7 +52,7 @@ import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DryRunOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
@ -420,7 +422,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
protected <O extends Op> OpSequence<OpDispenser<? extends O>> createOpSourceFromParsedOps(
// Map<String, DriverAdapter<?,?>> adapterCache,
// Map<String, OpMapper<? extends Op>> mapperCache,
List<DriverAdapter<?,?>> adapters,
List<DriverAdapter<?, ?>> adapters,
List<ParsedOp> pops
) {
try {
@ -470,7 +472,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
// }
planner.addOp((OpDispenser<? extends O>) dispenser, ratio);
} catch (Exception e) {
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(),e);
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e);
}
}
if (0 < dryrunCount) {
@ -569,7 +571,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
* @param opinit
* A function to map an OpTemplate to the executable operation form required by
* the native driver for this activity.
* @param defaultAdapter The adapter which will be used for any op templates with no explicit adapter
* @param defaultAdapter
* The adapter which will be used for any op templates with no explicit adapter
* @return The sequence of operations as determined by filtering and ratios
*/
@Deprecated(forRemoval = true)

View File

@ -1134,5 +1134,11 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
}
public Map<String,Object> getCombinedPrototype() {
Map<String,Object> prototype = new LinkedHashMap<>();
prototype.putAll(getDynamicPrototype());
prototype.putAll(getStaticPrototype());
prototype.putAll(getConfigPrototype());
return prototype;
}
}