merge fixups for neo4j

This commit is contained in:
Jonathan Shook 2024-04-11 22:53:18 -05:00
commit 6de6d0cf2f
177 changed files with 3469 additions and 891 deletions

View File

@ -79,7 +79,7 @@ jobs:
password: ${{ secrets.DOCKER_PASSWORD }}
- name: docker test build
uses: docker/build-push-action@v5.1.0
uses: docker/build-push-action@v5.3.0
with:
context: .
file: Dockerfile
@ -130,7 +130,7 @@ jobs:
scripts/bump-minor-version
- name: docker push to hub
uses: docker/build-push-action@v5.1.0
uses: docker/build-push-action@v5.3.0
with:
context: .
platforms: linux/amd64,linux/arm64
@ -141,7 +141,7 @@ jobs:
# https://github.com/softprops/action-gh-release
- name: create github release
uses: softprops/action-gh-release@v0.1.15
uses: softprops/action-gh-release@v2.0.4
if: startsWith(github.ref, 'refs/tags/')
with:
# body: ${{ steps.prepare_summary.outputs.release_summary }}

View File

@ -74,7 +74,7 @@ jobs:
password: ${{ secrets.DOCKER_PASSWORD }}
- name: docker test build
uses: docker/build-push-action@v5.1.0
uses: docker/build-push-action@v5.3.0
with:
context: .
file: Dockerfile
@ -115,7 +115,7 @@ jobs:
scripts/bump-minor-version
- name: docker push to hub
uses: docker/build-push-action@v5.1.0
uses: docker/build-push-action@v5.3.0
with:
context: .
platforms: linux/amd64,linux/arm64
@ -126,7 +126,7 @@ jobs:
# https://github.com/softprops/action-gh-release
- name: create github release
uses: softprops/action-gh-release@v0.1.15
uses: softprops/action-gh-release@v2.0.4
if: startsWith(github.ref, 'refs/tags/')
with:
# body: ${{ steps.prepare_summary.outputs.release_summary }}

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="await_index local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live index.await_index dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

14
.run/drop local.run.xml Normal file
View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="drop local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.drop milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="load_collection local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.load_collection milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

14
.run/rampup local.run.xml Normal file
View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="rampup local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live rampup dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; rampup_threads=1 rampup_cycles=1183600 milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="rampup_batch 100x local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.rampup_batch dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; batch_size=100 rampup_threads=10 rampup_cycles=11836 milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="rampup_batch 2x local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.rampup_batch dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; batch_size=2 rampup_threads=1 rampup_cycles=600000 milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

14
.run/schema local.run.xml Normal file
View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="schema local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.schema milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="schema_index local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.schema_index milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="search_and_verify threads=1 local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.search_and_verify dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; search_threads=1 search_cycles=100k milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="search_and_verify threads=100 local" type="JarApplication" folderName="milvus_vectors local">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="milvus_vector_live milvus_vectors.search_and_verify dataset=&quot;vector/ANN/glove-25-angular/glove-25-angular&quot; search_threads=100 search_cycles=100k milvushost=localhost --show-stacktraces --progress console:1s -v" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/local/milvus" />
<option name="ALTERNATIVE_JRE_PATH" />
<method v="2" />
</configuration>
</component>

View File

@ -120,7 +120,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
@Override
public AmqpTimeTrackOp apply(long cycle) {
public AmqpTimeTrackOp getOp(long cycle) {
Channel channel = getAmqpChannelForReceiver(cycle);
if (channel == null) {
throw new AmqpAdapterUnexpectedException(

View File

@ -175,7 +175,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
}
@Override
public AmqpTimeTrackOp apply(long cycle) {
public AmqpTimeTrackOp getOp(long cycle) {
String msgPayload = msgPayloadFunc.apply(cycle);
if (StringUtils.isBlank(msgPayload)) {
throw new AmqpAdapterInvalidParamException("Message payload must be specified and can't be empty!");

View File

@ -19,7 +19,6 @@ package io.nosqlbench.adapter.opensearch;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.components.core.NBComponent;
@ -30,23 +29,23 @@ import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
@Service(value= DriverAdapter.class, selector = "opensearch")
public class OpenSearchAdapter extends BaseDriverAdapter<Op,OpenSearchSpace> {
public OpenSearchAdapter(NBComponent parentComponent, NBLabels labels) {
public class AOSAdapter extends BaseDriverAdapter<Op, AOSSpace> {
public AOSAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels);
}
@Override
public Function<String, ? extends OpenSearchSpace> getSpaceInitializer(NBConfiguration cfg) {
return (String spaceName) -> new OpenSearchSpace(cfg);
public Function<String, ? extends AOSSpace> getSpaceInitializer(NBConfiguration cfg) {
return (String spaceName) -> new AOSSpace(cfg);
}
@Override
public OpMapper<Op> getOpMapper() {
return new OpenSearchOpMapper(this);
return new AOSOpMapper(this);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(OpenSearchSpace.getConfigModel());
return super.getConfigModel().add(AOSSpace.getConfigModel());
}
}

View File

@ -22,9 +22,9 @@ import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
@Service(value = DriverAdapterLoader.class,selector = "opensearch")
public class OpenSearchAdapterLoader implements DriverAdapterLoader {
public class AOSAdapterLoader implements DriverAdapterLoader {
@Override
public OpenSearchAdapter load(NBComponent parent, NBLabels childLabels) {
return new OpenSearchAdapter(parent, childLabels);
public AOSAdapter load(NBComponent parent, NBLabels childLabels) {
return new AOSAdapter(parent, childLabels);
}
}

View File

@ -17,35 +17,31 @@
package io.nosqlbench.adapter.opensearch;
import io.nosqlbench.adapter.opensearch.dispensers.*;
import io.nosqlbench.adapter.opensearch.ops.UpdateOp;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import org.opensearch.client.opensearch.OpenSearchClient;
public class OpenSearchOpMapper implements OpMapper<Op> {
private final OpenSearchAdapter adapter;
public class AOSOpMapper implements OpMapper<Op> {
private final AOSAdapter adapter;
public OpenSearchOpMapper(OpenSearchAdapter openSearchAdapter) {
this.adapter = openSearchAdapter;
public AOSOpMapper(AOSAdapter AOSAdapter) {
this.adapter = AOSAdapter;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp op) {
TypeAndTarget<OpenSearchOpTypes, String> typeAndTarget =
op.getTypeAndTarget(OpenSearchOpTypes.class, String.class, "verb", "index");
TypeAndTarget<AOSOpTypes, String> typeAndTarget =
op.getTypeAndTarget(AOSOpTypes.class, String.class, "verb", "index");
return switch (typeAndTarget.enumId) {
case create_index -> new CreateIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case delete_index -> new DeleteIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case index -> new IndexOpDispenser(adapter,op, typeAndTarget.targetFunction);
case update -> new UpdateOpDispenser(adapter,op, typeAndTarget.targetFunction);
case delete -> new DeleteOpDispenser(adapter,op, typeAndTarget.targetFunction);
case knn_search -> new KnnSearchOpDispenser(adapter,op, typeAndTarget.targetFunction);
case bulk -> new BulkOpDispenser(adapter, op, typeAndTarget.targetFunction);
case create_index -> new AOSCreateIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case delete_index -> new AOSDeleteIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case index -> new AOSIndexOpDispenser(adapter,op, typeAndTarget.targetFunction);
case update -> new AOSUpdateOpDispenser(adapter,op, typeAndTarget.targetFunction);
case delete -> new AOSDeleteOpDispenser(adapter,op, typeAndTarget.targetFunction);
case knn_search -> new AOSKnnSearchOpDispenser(adapter,op, typeAndTarget.targetFunction);
case bulk -> new AOSBulkOpDispenser(adapter, op, typeAndTarget.targetFunction);
default -> throw new RuntimeException("Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " +
"mapping parsed op " + op);
};

View File

@ -16,7 +16,7 @@
package io.nosqlbench.adapter.opensearch;
public enum OpenSearchOpTypes {
public enum AOSOpTypes {
create_index,
delete_index,
index,

View File

@ -16,7 +16,7 @@
package io.nosqlbench.adapter.opensearch;
public enum AwsOsServiceType {
public enum AOSServiceType {
aoss,
es
}

View File

@ -17,10 +17,6 @@
package io.nosqlbench.adapter.opensearch;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.profile.internal.BasicProfile;
import com.amazonaws.auth.profile.internal.ProfileKeyConstants;
import com.amazonaws.auth.profile.internal.ProfileStaticCredentialsProvider;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
@ -35,14 +31,13 @@ import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import java.io.IOException;
import java.util.Map;
public class OpenSearchSpace implements AutoCloseable {
public class AOSSpace implements AutoCloseable {
private final NBConfiguration cfg;
protected OpenSearchClient client;
public OpenSearchSpace(NBConfiguration cfg) {
public AOSSpace(NBConfiguration cfg) {
this.cfg = cfg;
}
@ -75,7 +70,7 @@ public class OpenSearchSpace implements AutoCloseable {
AwsSdk2TransportOptions transportOptions = transportOptionsBuilder.build();
AwsOsServiceType svctype = AwsOsServiceType.valueOf(cfg.get("svctype"));
AOSServiceType svctype = AOSServiceType.valueOf(cfg.get("svctype"));
AwsSdk2Transport awsSdk2Transport =
new AwsSdk2Transport(
@ -101,7 +96,7 @@ public class OpenSearchSpace implements AutoCloseable {
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(OpenSearchSpace.class)
return ConfigModel.of(AOSSpace.class)
.add(Param.required("region", String.class).setDescription("The region to connect to"))
.add(Param.required("host", String.class).setDescription("The Open Search API endpoint host"))
.add(Param.optional("profile")

View File

@ -20,7 +20,7 @@ import io.nosqlbench.adapter.opensearch.pojos.Doc;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.core.search.Hit;
public class Utils {
public class AOSUtils {
public static int[] DocHitsToIntIndicesArray(SearchResponse<Doc> response) {
int[] indices = response.hits().hits()

View File

@ -16,8 +16,8 @@
package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.OpenSearchSpace;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.AOSSpace;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
@ -25,12 +25,12 @@ import org.opensearch.client.opensearch.OpenSearchClient;
import java.util.function.LongFunction;
public abstract class BaseOpenSearchOpDispenser extends BaseOpDispenser<Op,Object> {
protected final LongFunction<OpenSearchSpace> spaceF;
public abstract class AOSBaseOpDispenser extends BaseOpDispenser<Op,Object> {
protected final LongFunction<AOSSpace> spaceF;
protected final LongFunction<OpenSearchClient> clientF;
private final LongFunction<? extends Op> opF;
protected BaseOpenSearchOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
protected AOSBaseOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op);
this.spaceF =adapter.getSpaceFunc(op);
this.clientF = (long l) -> this.spaceF.apply(l).getClient();
@ -44,7 +44,7 @@ public abstract class BaseOpenSearchOpDispenser extends BaseOpDispenser<Op,Objec
);
@Override
public Op apply(long value) {
public Op getOp(long value) {
return opF.apply(value);
}
}

View File

@ -18,34 +18,19 @@ package io.nosqlbench.adapter.opensearch.dispensers;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.BulkOp;
import io.nosqlbench.adapter.opensearch.ops.IndexOp;
import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpDef;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpDef;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSBulkOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpType;
import org.opensearch.client.opensearch._types.Refresh;
import org.opensearch.client.opensearch._types.VersionType;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.snakeyaml.engine.v2.api.lowlevel.Parse;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class BulkOpDispenser extends BaseOpenSearchOpDispenser {
public class AOSBulkOpDispenser extends AOSBaseOpDispenser {
private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
public BulkOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSBulkOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
}
@ -55,8 +40,8 @@ public class BulkOpDispenser extends BaseOpenSearchOpDispenser {
ParsedOp op,
LongFunction<String> targetF
) {
LongFunction<BulkRequest> func = OpenSearchRequests.bulk(op,targetF);
return l -> new BulkOp(clientF.apply(l), func.apply(l));
LongFunction<BulkRequest> func = AOSRequests.bulk(op,targetF);
return l -> new AOSBulkOp(clientF.apply(l), func.apply(l));
}
}

View File

@ -16,7 +16,7 @@
package io.nosqlbench.adapter.opensearch.dispensers;
public enum BulkOpTypes {
public enum AOSBulkOpTypes {
create,
index,
delete,

View File

@ -16,26 +16,25 @@
package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.CreateIndexOp;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSCreateIndexOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.mapping.*;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.IndexSettings;
import java.util.Map;
import java.util.function.LongFunction;
public class CreateIndexOpDispenser extends BaseOpenSearchOpDispenser {
public class AOSCreateIndexOpDispenser extends AOSBaseOpDispenser {
private final ParsedOp pop;
private final int dimensions;
private final int ef_construction;
private final int m;
public CreateIndexOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSCreateIndexOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
this.pop = op;
this.dimensions = pop.getStaticValue("dimensions",Integer.class).intValue();
@ -44,8 +43,8 @@ public class CreateIndexOpDispenser extends BaseOpenSearchOpDispenser {
}
@Override
public LongFunction<CreateIndexOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op,
LongFunction<String> targetF) {
public LongFunction<AOSCreateIndexOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op,
LongFunction<String> targetF) {
CreateIndexRequest.Builder eb = new CreateIndexRequest.Builder();
LongFunction<CreateIndexRequest.Builder> bfunc =
l -> new CreateIndexRequest.Builder()
@ -54,7 +53,7 @@ public class CreateIndexOpDispenser extends BaseOpenSearchOpDispenser {
bfunc = op.enhanceFunc(bfunc, "mappings", Map.class, this::resolveTypeMapping);
LongFunction<CreateIndexRequest.Builder> finalBfunc = bfunc;
return (long l) -> new CreateIndexOp(clientF.apply(l), finalBfunc.apply(l).build());
return (long l) -> new AOSCreateIndexOp(clientF.apply(l), finalBfunc.apply(l).build());
}
// https://opensearch.org/docs/latest/search-plugins/knn/knn-index/

View File

@ -16,26 +16,25 @@
package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.DeleteIndexOp;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSDeleteIndexOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import java.util.Map;
import java.util.function.LongFunction;
public class DeleteIndexOpDispenser extends BaseOpenSearchOpDispenser {
public class AOSDeleteIndexOpDispenser extends AOSBaseOpDispenser {
public DeleteIndexOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSDeleteIndexOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
}
@Override
public LongFunction<DeleteIndexOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
public LongFunction<AOSDeleteIndexOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
DeleteIndexRequest.Builder eb = new DeleteIndexRequest.Builder();
LongFunction<DeleteIndexRequest.Builder> f =
l -> new DeleteIndexRequest.Builder().index(targetF.apply(l));
return l -> new DeleteIndexOp(clientF.apply(l),f.apply(1).build());
return l -> new AOSDeleteIndexOp(clientF.apply(l),f.apply(1).build());
}
}

View File

@ -16,30 +16,25 @@
package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.CreateIndexOp;
import io.nosqlbench.adapter.opensearch.ops.DeleteOp;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSDeleteOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.mapping.*;
import org.opensearch.client.opensearch.core.DeleteRequest;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import java.util.Map;
import java.util.function.LongFunction;
public class DeleteOpDispenser extends BaseOpenSearchOpDispenser {
public class AOSDeleteOpDispenser extends AOSBaseOpDispenser {
public DeleteOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSDeleteOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
}
@Override
public LongFunction<DeleteOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
public LongFunction<AOSDeleteOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
DeleteRequest.Builder eb = new DeleteRequest.Builder();
LongFunction<DeleteRequest.Builder> bfunc = l -> new DeleteRequest.Builder().index(targetF.apply(l));
return (long l) -> new DeleteOp(clientF.apply(l), bfunc.apply(l).build());
return (long l) -> new AOSDeleteOp(clientF.apply(l), bfunc.apply(l).build());
}
}

View File

@ -18,8 +18,8 @@ package io.nosqlbench.adapter.opensearch.dispensers;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.IndexOp;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSIndexOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
@ -29,20 +29,20 @@ import org.opensearch.client.opensearch.core.IndexRequest;
import java.util.function.LongFunction;
public class IndexOpDispenser extends BaseOpenSearchOpDispenser {
private final static Logger logger = LogManager.getLogger(IndexOpDispenser.class);
public class AOSIndexOpDispenser extends AOSBaseOpDispenser {
private final static Logger logger = LogManager.getLogger(AOSIndexOpDispenser.class);
private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
private final String diag;
public IndexOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSIndexOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
this.diag = op.getStaticConfigOr("daig","false");
}
@Override
public LongFunction<? extends Op> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
LongFunction<IndexRequest> irqF = OpenSearchRequests.index(op);
return l -> new IndexOp(clientF.apply(l), irqF.apply(l));
LongFunction<IndexRequest> irqF = AOSRequests.index(op);
return l -> new AOSIndexOp(clientF.apply(l), irqF.apply(l));
}
}

View File

@ -16,9 +16,8 @@
package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.KnnSearchOp;
import io.nosqlbench.adapter.opensearch.pojos.Doc;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSKnnSearchOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
@ -32,10 +31,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class KnnSearchOpDispenser extends BaseOpenSearchOpDispenser {
public class AOSKnnSearchOpDispenser extends AOSBaseOpDispenser {
private Class<?> schemaClass;
public KnnSearchOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSKnnSearchOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
String schemaClassStr = op.getStaticConfigOr("schema", "io.nosqlbench.adapter.opensearch.pojos.Doc");
try {
@ -46,7 +45,7 @@ public class KnnSearchOpDispenser extends BaseOpenSearchOpDispenser {
}
@Override
public LongFunction<KnnSearchOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op,LongFunction<String> targetF) {
public LongFunction<AOSKnnSearchOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
LongFunction<KnnQuery.Builder> knnfunc = l -> new KnnQuery.Builder();
knnfunc = op.enhanceFuncOptionally(knnfunc, "k",Integer.class, KnnQuery.Builder::k);
knnfunc = op.enhanceFuncOptionally(knnfunc, "vector", List.class, this::convertVector);
@ -64,7 +63,7 @@ public class KnnSearchOpDispenser extends BaseOpenSearchOpDispenser {
.index(targetF.apply(l))
.query(new Query.Builder().knn(finalKnnfunc.apply(l).build()).build());
return (long l) -> new KnnSearchOp(clientF.apply(l), bfunc.apply(l).build(), schemaClass);
return (long l) -> new AOSKnnSearchOp(clientF.apply(l), bfunc.apply(l).build(), schemaClass);
}
private LongFunction<Query> buildFilterQuery(LongFunction<Map> mapLongFunction) {

View File

@ -35,13 +35,11 @@ import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class OpenSearchRequests {
public class AOSRequests {
private final static Logger logger = LogManager.getLogger(IndexOpDispenser.class);
private final static Logger logger = LogManager.getLogger(AOSIndexOpDispenser.class);
private static Gson gson = new GsonBuilder().setPrettyPrinting().create();
public static <T> LongFunction<BulkRequest> bulk(ParsedOp op, LongFunction<String> targetF) {
@ -57,12 +55,12 @@ public class OpenSearchRequests {
ParsedOp subop = op.getAsSubOp("op_template", ParsedOp.SubOpNaming.ParentAndSubKey);
int repeat = subop.getStaticConfigOr("repeat", 1);
TypeAndTarget<BulkOpTypes, String> typeinfo =
subop.getTypeAndTarget(BulkOpTypes.class, String.class);
TypeAndTarget<AOSBulkOpTypes, String> typeinfo =
subop.getTypeAndTarget(AOSBulkOpTypes.class, String.class);
LongFunction<BulkOperationVariant> bop = switch (typeinfo.enumId) {
case create -> OpenSearchRequests.createOperation(subop);
case index -> OpenSearchRequests.indexOperation(subop);
case create -> AOSRequests.createOperation(subop);
case index -> AOSRequests.indexOperation(subop);
default -> throw new OpConfigError("Unsupported type in bulk operation: '" + typeinfo.enumId + "'");
};
@ -116,7 +114,7 @@ public class OpenSearchRequests {
func = op.enhanceFuncOptionally(func, "version", long.class, IndexRequest.Builder::version);
func = op.enhanceEnumOptionally(func, "opType", OpType.class, IndexRequest.Builder::opType);
func = op.enhanceEnumOptionally(func, "versionType", VersionType.class, IndexRequest.Builder::versionType);
func = op.enhanceFuncPivot(func, "document", Object.class, OpenSearchRequests::bindDocument);
func = op.enhanceFuncPivot(func, "document", Object.class, AOSRequests::bindDocument);
LongFunction<IndexRequest.Builder> finalFunc1 = func;
return l -> finalFunc1.apply(l).build();
}

View File

@ -16,30 +16,25 @@
package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.CreateIndexOp;
import io.nosqlbench.adapter.opensearch.ops.UpdateOp;
import io.nosqlbench.adapter.opensearch.AOSAdapter;
import io.nosqlbench.adapter.opensearch.ops.AOSUpdateOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.mapping.*;
import org.opensearch.client.opensearch.core.UpdateRequest;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import java.util.Map;
import java.util.function.LongFunction;
public class UpdateOpDispenser extends BaseOpenSearchOpDispenser {
public class AOSUpdateOpDispenser extends AOSBaseOpDispenser {
public UpdateOpDispenser(OpenSearchAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
public AOSUpdateOpDispenser(AOSAdapter adapter, ParsedOp op, LongFunction<String> targetF) {
super(adapter, op, targetF);
}
@Override
public LongFunction<UpdateOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
public LongFunction<AOSUpdateOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op, LongFunction<String> targetF) {
LongFunction<UpdateRequest.Builder> bfunc = l -> new UpdateRequest.Builder().index(targetF.apply(l));
// TODO: add details here
return l -> new UpdateOp(clientF.apply(l),bfunc.apply(l).build(),Object.class);
return l -> new AOSUpdateOp(clientF.apply(l),bfunc.apply(l).build(),Object.class);
}
}

View File

@ -19,10 +19,10 @@ package io.nosqlbench.adapter.opensearch.ops;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import org.opensearch.client.opensearch.OpenSearchClient;
public abstract class BaseOpenSearchOp implements CycleOp<Object> {
public abstract class AOSBaseOp implements CycleOp<Object> {
protected final OpenSearchClient client;
public BaseOpenSearchOp(OpenSearchClient client) {
public AOSBaseOp(OpenSearchClient client) {
this.client = client;
}

View File

@ -20,17 +20,14 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import java.io.IOException;
public class BulkOp extends BaseOpenSearchOp {
private final static Logger logger = LogManager.getLogger(BulkOp.class);
public class AOSBulkOp extends AOSBaseOp {
private final static Logger logger = LogManager.getLogger(AOSBulkOp.class);
private final BulkRequest rq;
public BulkOp(OpenSearchClient client, BulkRequest rq) {
public AOSBulkOp(OpenSearchClient client, BulkRequest rq) {
super(client);
this.rq = rq;
}

View File

@ -20,10 +20,10 @@ import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.CreateIndexResponse;
public class CreateIndexOp extends BaseOpenSearchOp {
public class AOSCreateIndexOp extends AOSBaseOp {
private final CreateIndexRequest rq;
public CreateIndexOp(OpenSearchClient client, CreateIndexRequest rq) {
public AOSCreateIndexOp(OpenSearchClient client, CreateIndexRequest rq) {
super(client);
this.rq = rq;
}

View File

@ -17,15 +17,14 @@
package io.nosqlbench.adapter.opensearch.ops;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
import java.io.IOException;
public class DeleteIndexOp extends BaseOpenSearchOp {
public class AOSDeleteIndexOp extends AOSBaseOp {
private final DeleteIndexRequest rq;
public DeleteIndexOp(OpenSearchClient client, DeleteIndexRequest rq) {
public AOSDeleteIndexOp(OpenSearchClient client, DeleteIndexRequest rq) {
super(client);
this.rq = rq;
}

View File

@ -18,14 +18,13 @@ package io.nosqlbench.adapter.opensearch.ops;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.DeleteRequest;
import org.opensearch.client.opensearch.core.IndexRequest;
import java.io.IOException;
public class DeleteOp extends BaseOpenSearchOp {
public class AOSDeleteOp extends AOSBaseOp {
private final DeleteRequest rq;
public DeleteOp(OpenSearchClient client, DeleteRequest rq) {
public AOSDeleteOp(OpenSearchClient client, DeleteRequest rq) {
super(client);
this.rq = rq;
}

View File

@ -21,15 +21,14 @@ import org.apache.logging.log4j.Logger;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.IndexRequest;
import org.opensearch.client.opensearch.core.IndexResponse;
import org.opensearch.client.opensearch.core.UpdateRequest;
import java.io.IOException;
public class IndexOp extends BaseOpenSearchOp {
private final static Logger logger = LogManager.getLogger(IndexOp.class);
public class AOSIndexOp extends AOSBaseOp {
private final static Logger logger = LogManager.getLogger(AOSIndexOp.class);
private final IndexRequest<?> rq;
public IndexOp(OpenSearchClient client, IndexRequest<?> rq) {
public AOSIndexOp(OpenSearchClient client, IndexRequest<?> rq) {
super(client);
this.rq = rq;
}

View File

@ -19,14 +19,12 @@ package io.nosqlbench.adapter.opensearch.ops;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.SearchResponse;
import org.opensearch.client.opensearch.indices.GetIndexRequest;
import org.opensearch.client.opensearch.indices.GetIndexResponse;
public class KnnSearchOp extends BaseOpenSearchOp {
public class AOSKnnSearchOp extends AOSBaseOp {
private final SearchRequest rq;
private final Class<?> doctype;
public KnnSearchOp(OpenSearchClient client, SearchRequest rq, Class<?> doctype) {
public AOSKnnSearchOp(OpenSearchClient client, SearchRequest rq, Class<?> doctype) {
super(client);
this.rq = rq;
this.doctype = doctype;

View File

@ -21,11 +21,11 @@ import org.opensearch.client.opensearch.core.UpdateRequest;
import java.io.IOException;
public class UpdateOp extends BaseOpenSearchOp {
public class AOSUpdateOp extends AOSBaseOp {
private final UpdateRequest rq;
private final Class<?> doctype;
public UpdateOp(OpenSearchClient client, UpdateRequest rq, Class<?> doctype) {
public AOSUpdateOp(OpenSearchClient client, UpdateRequest rq, Class<?> doctype) {
super(client);
this.rq = rq;
this.doctype = doctype;

View File

@ -98,12 +98,12 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.16.1</version>
<version>2.17.0</version>
</dependency>
</dependencies>

View File

@ -0,0 +1,97 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import io.nosqlbench.adapter.cqld4.optionhelpers.BatchTypeEnum;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlBatchStatement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.jetbrains.annotations.NotNull;
import java.util.function.LongFunction;
public class CqlD4BatchStmtDispenser extends Cqld4BaseOpDispenser {
private final int repeat;
private final ParsedOp subop;
private final OpMapper submapper;
private LongFunction<Statement> opfunc;
public CqlD4BatchStmtDispenser(
DriverAdapter adapter,
LongFunction<CqlSession> sessionFunc,
ParsedOp op,
int repeat,
ParsedOp subop,
OpDispenser<? extends Cqld4CqlOp> subopDispenser
) {
super(adapter, sessionFunc, op);
this.repeat = repeat;
this.subop = subop;
this.opfunc = createStmtFunc(op, subopDispenser);
this.submapper = adapter.getOpMapper();
subopDispenser = submapper.apply(subop);
}
private LongFunction<Statement> createStmtFunc(ParsedOp topOp, OpDispenser<? extends Cqld4CqlOp> subopDispenser) {
Cqld4CqlOp exampleOp = subopDispenser.apply(0L);
Statement<?> example = exampleOp.getStmt();
if (!(example instanceof BatchableStatement<?> b)) {
throw new RuntimeException("Statement type '" + example.getClass().getCanonicalName() + " is not " +
"batchable. query=" + exampleOp.getQueryString());
}
BatchTypeEnum bte = topOp.getEnumFromFieldOr(BatchTypeEnum.class, BatchTypeEnum.unlogged, "batchtype");
LongFunction<BatchStatementBuilder> bsbf = l -> new BatchStatementBuilder(bte.batchtype);
LongFunction<Statement> bsf = getBatchAccumulator(bsbf, subopDispenser);
bsf = getEnhancedStmtFunc(bsf,topOp);
return bsf;
}
@NotNull
private LongFunction<Statement> getBatchAccumulator(LongFunction<BatchStatementBuilder> bsb, OpDispenser<? extends Cqld4CqlOp> subopDispenser) {
LongFunction<BatchStatementBuilder> f = l -> {
BatchStatementBuilder bsa = bsb.apply(l);
for (int i = 0; i < repeat; i++) {
Cqld4CqlOp op = subopDispenser.apply(i+l);
BatchableStatement<?> stmt = (BatchableStatement<?>) op.getStmt();
bsa= bsa.addStatement(stmt);
}
return bsa;
};
LongFunction<Statement> bsf = (long l) -> f.apply(l).build();
return bsf;
}
@Override
public Cqld4CqlOp getOp(long value) {
Statement bstmt = opfunc.apply(value);
return new Cqld4CqlBatchStatement(
getSessionFunc().apply(value),
(BatchStatement) bstmt,
getMaxPages(),
getMaxLwtRetries(),
isRetryReplace(),
this
);
}
}

View File

@ -35,7 +35,7 @@ public class CqlD4RainbowTableDispenser extends Cqld4BaseOpDispenser {
}
@Override
public Cqld4CqlOp apply(long cycle) {
public Cqld4CqlOp getOp(long cycle) {
throw new RuntimeException("implement me");
// return new Cqld4RainbowTableOp(
// getSessionFunc().apply(value),

View File

@ -57,7 +57,7 @@ public class Cqld4FluentGraphOpDispenser extends BaseOpDispenser<Op, Cqld4Space>
}
@Override
public Op apply(long value) {
public Op getOp(long value) {
String graphname = graphnameFunc.apply(value);
Script script = tlScript.get();
Map<String, Object> allMap = virtdataBindings.getAllMap(value);

View File

@ -55,7 +55,7 @@ public class Cqld4GremlinOpDispenser extends BaseOpDispenser<Cqld4ScriptGraphOp,
}
@Override
public Cqld4ScriptGraphOp apply(long value) {
public Cqld4ScriptGraphOp getOp(long value) {
ScriptGraphStatement stmt = stmtFunc.apply(value);
if (diagFunc.apply(value)>0L) {
System.out.println("## GREMLIN DIAG: ScriptGraphStatement on graphname(" + stmt.getGraphName() + "):\n" + stmt.getScript());

View File

@ -85,7 +85,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
}
@Override
public Cqld4CqlOp apply(long cycle) {
public Cqld4CqlOp getOp(long cycle) {
BoundStatement boundStatement;
try {

View File

@ -44,7 +44,7 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
}
@Override
public Cqld4CqlOp apply(long value) {
public Cqld4CqlOp getOp(long value) {
return new Cqld4CqlSimpleStatement(
getSessionFunc().apply(value),
(SimpleStatement) stmtFunc.apply(value),

View File

@ -41,7 +41,7 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
}
@Override
public Cqld4CqlSimpleStatement apply(long value) {
public Cqld4CqlSimpleStatement getOp(long value) {
return new Cqld4CqlSimpleStatement(
getSessionFunc().apply(value),
(SimpleStatement) stmtFunc.apply(value),

View File

@ -40,7 +40,7 @@ public class Cqld4SsTableDispenser extends Cqld4BaseOpDispenser {
}
@Override
public Cqld4CqlOp apply(long cycle) {
public Cqld4CqlOp getOp(long cycle) {
// return new CqlD4SsTable(
// getSessionFunc().apply(value),
// (SsTable) stmtFunc.apply(value),

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.opdispensers.CqlD4BatchStmtDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlBatchStatement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import java.util.function.LongFunction;
public class CqlD4BatchStmtMapper implements OpMapper<Cqld4CqlOp> {
private final LongFunction<CqlSession> sessionFunc;
private final TypeAndTarget<CqlD4OpType, String> target;
private final DriverAdapter adapter;
public CqlD4BatchStmtMapper(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType,String> target) {
this.sessionFunc=sessionFunc;
this.target = target;
this.adapter = adapter;
}
/**
* TODO: Make this not require a sub-op element for "uniform batches",
* but allow a sub-op sequence for custom batches.
* @param op the function argument
* @return
*/
public OpDispenser<Cqld4CqlOp> apply(ParsedOp op) {
ParsedOp subop = op.getAsSubOp("op_template", ParsedOp.SubOpNaming.ParentAndSubKey);
int repeat = op.getStaticValue("repeat");
OpMapper<Cqld4CqlOp> subopMapper = adapter.getOpMapper();
OpDispenser<? extends Cqld4CqlOp> subopDispenser = subopMapper.apply(subop);
return new CqlD4BatchStmtDispenser(adapter, sessionFunc, op,repeat, subop, subopDispenser);
}
}

View File

@ -43,6 +43,14 @@ public enum CqlD4OpType {
*/
prepared,
/**
* Allows for a statement template to be used to create a batch statement.
* The fields 'op_template', and 'repeat' are required, and all fields below
* the op_template field are a nested version of the other op types here, but
* supports only the simple and prepared forms for historic compatibility reasons.
*/
batch,
/**
* uses {@link com.datastax.dse.driver.api.core.graph.ScriptGraphStatement}
* This is the "raw" mode of using gremlin. It is not as efficient, and thus

View File

@ -72,6 +72,7 @@ public class Cqld4CoreOpMapper implements OpMapper<Op> {
case raw -> new CqlD4RawStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
case simple -> new CqlD4CqlSimpleStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
case prepared -> new CqlD4PreparedStmtMapper(adapter, sessionFunc, target).apply(op);
case batch -> new CqlD4BatchStmtMapper(adapter, sessionFunc, target).apply(op);
case gremlin -> new Cqld4GremlinOpMapper(adapter, sessionFunc, target.targetFunction).apply(op);
case fluent -> new Cqld4FluentGraphOpMapper(adapter, sessionFunc, target).apply(op);
case rainbow -> new CqlD4RainbowTableMapper(adapter, sessionFunc, target.targetFunction).apply(op);

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.optionhelpers;
import com.datastax.oss.driver.api.core.cql.BatchType;
public enum BatchTypeEnum {
logged(BatchType.LOGGED),
unlogged(BatchType.UNLOGGED),
counter(BatchType.COUNTER);
public final BatchType batchtype;
BatchTypeEnum(BatchType batchtype) {
this.batchtype = batchtype;
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.datamappers.functions.to_cqlvector;
import com.datastax.oss.driver.api.core.data.CqlVector;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
import java.util.HashMap;
import java.util.function.Function;
@Categories(Category.state)
@ThreadSafeMapper
public class LoadCqlVector implements Function<Object,com.datastax.oss.driver.api.core.data.CqlVector> {
private final String name;
private final Function<Object,Object> nameFunc;
private final com.datastax.oss.driver.api.core.data.CqlVector defaultValue;
@Example({"LoadDouble('foo')","for the current thread, load a double value from the named variable."})
public LoadCqlVector(String name) {
this.name = name;
this.nameFunc=null;
this.defaultValue=com.datastax.oss.driver.api.core.data.CqlVector.newInstance(0.0f);
}
@Example({"LoadDouble('foo',23D)","for the current thread, load a double value from the named variable," +
"or the default value if the named variable is not defined."})
public LoadCqlVector(String name, int len) {
this.name = name;
this.nameFunc=null;
Double[] ary = new Double[len];
for (int i = 0; i < len; i++) {
ary[i]=(double)i;
}
this.defaultValue=com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary);
}
@Override
public com.datastax.oss.driver.api.core.data.CqlVector apply(Object o) {
HashMap<String, Object> map = SharedState.tl_ObjectMap.get();
String varname=(nameFunc!=null) ? String.valueOf(nameFunc.apply(o)) : name;
Object value = map.getOrDefault(varname, defaultValue);
if (value instanceof CqlVector<?> cqlvector) {
return cqlvector;
} else if (value instanceof float[] fa) {
Float[] ary = new Float[fa.length];
for (int i = 0; i < fa.length; i++) {
ary[i]=fa[i];
}
return com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary);
} else if (value instanceof double[] da) {
Double[] ary = new Double[da.length];
for (int i = 0; i < da.length; i++) {
ary[i]=da[i];
}
return com.datastax.oss.driver.api.core.data.CqlVector.newInstance(ary);
} else {
return (com.datastax.oss.driver.api.core.data.CqlVector) value;
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.datamappers.functions.to_cqlvector;
import com.datastax.oss.driver.api.core.data.CqlVector;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import java.util.function.LongFunction;
@Categories(Category.state)
@ThreadSafeMapper
public class LoadCqlVectorFromArray implements LongFunction<CqlVector> {
private final String name;
private final Function<Object, Object> nameFunc;
private final CqlVector[] defaultValue;
private final int len;
private final int batchsize;
public LoadCqlVectorFromArray(String name, int len, int batchsize) {
this.name = name;
this.nameFunc = null;
Float[] ary = new Float[len];
for (int i = 0; i < len; i++) {
ary[i] = (float)i;
}
this.defaultValue = new CqlVector[]{CqlVector.newInstance(ary)};
this.len = len;
this.batchsize = batchsize;
}
@Override
public CqlVector apply(long cycle) {
int offset = (int) (cycle % batchsize);
HashMap<String, Object> map = SharedState.tl_ObjectMap.get();
String varname = (nameFunc != null) ? String.valueOf(nameFunc.apply(cycle)) : name;
Object object = map.getOrDefault(varname, defaultValue);
if (object.getClass().isArray()) {
object = Array.get(object,offset);
} else if (object instanceof double[][] dary) {
object = dary[offset];
} else if (object instanceof float[][] fary) {
object = fary[offset];
} else if (object instanceof Double[][] dary) {
object = dary[offset];
} else if (object instanceof Float[][] fary) {
object = fary[offset];
} else if (object instanceof CqlVector<?>[] cary) {
object = cary[offset];
} else if (object instanceof List<?> list) {
object = list.get(offset);
} else {
throw new RuntimeException("Unrecognized type for ary of ary:" + object.getClass().getCanonicalName());
}
if (object instanceof CqlVector<?> cqlvector) {
return cqlvector;
} else if (object instanceof float[] fa) {
Float[] ary = new Float[fa.length];
for (int i = 0; i < fa.length; i++) {
ary[i] = fa[i];
}
return CqlVector.newInstance(ary);
} else if (object instanceof double[] da) {
Double[] ary = new Double[da.length];
for (int i = 0; i < da.length; i++) {
ary[i] = da[i];
}
return CqlVector.newInstance(ary);
} else {
return (CqlVector) object;
}
}
}

View File

@ -45,5 +45,4 @@ public class CqlUtils extends NBBaseComponent {
return rows.stream().mapToInt(r -> Integer.parseInt(Objects.requireNonNull(r.getString(fieldName)))).toArray();
}
}

View File

@ -9,11 +9,17 @@ description: |
population are replaced with new values which never repeat. During the main phase, random partitions are selected for
upsert, with row values never repeating.
TEMPLATE(batchsize,100)
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
batch:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup_batch cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
astra:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
@ -26,6 +32,7 @@ scenarios:
bindings:
seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString() -> String
seq_value: Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String
batch_seq_value: Mul(TEMPLATE(batchsize,100)L); Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String
rw_key: TEMPLATE(keydist,Uniform(0,1000000000)); ToString() -> String
rw_value: Hash(); TEMPLATE(valdist,Uniform(0,1000000000)); ToString() -> String
@ -62,6 +69,18 @@ blocks:
insert into TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue)
(key, value)
values ({seq_key},{seq_value});
rampup_batch:
params:
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
ops:
rampup_insert:
batch: testing
repeat: 100
op_template:
prepared: |
insert into TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue)
(key, value)
values ({seq_key},{seq_value});
verify:
params:
cl: TEMPLATE(read_cl,LOCAL_QUORUM)

View File

@ -69,6 +69,8 @@ nb5 ... driverconfig='http://gist.github.com...'
* **showstmt** - enable per-statement diagnostics which show as much of the statement as possible
for the given statement type. *WARNING* - Do not use this for performance testing, only for
diagnostics.
* **diag** - a set of options for advanced diagnostics for CQL. Defaults to `diag=none`.
Valid values are none, addr, mid, all. Presently, only none and all are supported.
* **maxpages** - configure the maximum number of pages allowed in a CQL result set. This is
configured to `maxpages=1` by default, so that users will be aware of any paging that occurs
by default. If you expect and want to allow paging in your operation, then set this number
@ -140,6 +142,13 @@ ops:
raw: |
create table if not exist {ksname}.{tblname} ...
example-batch-stmt:
batch:
repeat: 50
op_template:
prepared: |
select three, four from knock.onthedoor where ...
# gremlin statement using the fluent API, as it would be written in a client application
example-fluent-graph-stmt:
fluent: >-

View File

@ -129,7 +129,7 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
}
@Override
public DiagOp apply(long value) {
public DiagOp getOp(long value) {
return opFunc.apply(value);
}
}

View File

@ -43,7 +43,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.12.658</version>
<version>1.12.681</version>
</dependency>
</dependencies>

View File

@ -128,7 +128,7 @@ public class DDBCreateTableOpDispenser extends BaseOpDispenser<DynamoDBOp, Dynam
}
@Override
public DDBCreateTableOp apply(long cycle) {
public DDBCreateTableOp getOp(long cycle) {
CreateTableRequest rq = new CreateTableRequest();
rq.setTableName(tableNameFunc.apply(cycle));
rq.setKeySchema(keySchemaFunc.apply(cycle));

View File

@ -47,7 +47,7 @@ public class DDBDeleteTableOpDispenser extends BaseOpDispenser<DynamoDBOp, Dynam
}
@Override
public DDBDeleteTableOp apply(long cycle) {
public DDBDeleteTableOp getOp(long cycle) {
DeleteTableRequest rq = new DeleteTableRequest();
rq.setTableName(tableNameFunc.apply(cycle));
return new DDBDeleteTableOp(ddb, rq);

View File

@ -83,7 +83,7 @@ public class DDBGetItemOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDBS
}
@Override
public DDBGetItemOp apply(long value) {
public DDBGetItemOp getOp(long value) {
Table table = targetTableFunction.apply(value);
GetItemSpec getitemSpec = getItemSpecFunc.apply(value);
return new DDBGetItemOp(ddb, table, getitemSpec);

View File

@ -51,7 +51,7 @@ public class DDBPutItemOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDBS
}
@Override
public DynamoDBOp apply(long value) {
public DynamoDBOp getOp(long value) {
String tablename = tableNameFunc.apply(value);
Item item = itemfunc.apply(value);
return new DDBPutItemOp(ddb,tablename,item);

View File

@ -150,7 +150,7 @@ public class DDBQueryOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDBSpa
}
@Override
public DDBQueryOp apply(long cycle) {
public DDBQueryOp getOp(long cycle) {
Table table = tableFunc.apply(cycle);
QuerySpec queryspec = querySpecFunc.apply(cycle);
return new DDBQueryOp(ddb,table,queryspec);

View File

@ -44,7 +44,7 @@ public class RawDynamoDBOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDB
}
@Override
public DynamoDBOp apply(long value) {
public DynamoDBOp getOp(long value) {
String body = jsonFunction.apply(value);
return new RawDynamodOp(ddb,body);
}

View File

@ -56,7 +56,7 @@
<dependency>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator</artifactId>
<version>7.3.0</version>
<version>7.4.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@ -2,13 +2,13 @@ package io.nosqlbench.adapter.http;
/*
* Copyright (c) 2022 nosqlbench
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -22,6 +22,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.sql.Array;
import java.util.ArrayList;
import java.util.List;
@ -61,6 +62,56 @@ public class JsonElementUtils {
i++;
}
return keys;
}
public static List<Float> customNumberArrayToFloatList(JsonElement element) {
JsonObject o1 = element.getAsJsonObject();
JsonElement data = o1.get("data");
JsonArray dary = data.getAsJsonArray();
JsonElement element0 = dary.get(0);
JsonObject eobj1 = element0.getAsJsonObject();
JsonElement embedding = eobj1.get("embedding");
JsonArray ary = embedding.getAsJsonArray();
ArrayList<Float> list = new ArrayList<>(ary.size());
for (JsonElement jsonElement : ary) {
list.add(jsonElement.getAsFloat());
}
return list;
}
public static float[] customNumberArrayToFloatArray(JsonElement element) {
JsonObject o1 = element.getAsJsonObject();
JsonElement data = o1.get("data");
JsonArray dary = data.getAsJsonArray();
JsonElement element0 = dary.get(0);
JsonObject eobj1 = element0.getAsJsonObject();
JsonElement embedding = eobj1.get("embedding");
JsonArray ary = embedding.getAsJsonArray();
float[] floats = new float[ary.size()];
for (int i = 0; i < floats.length; i++) {
floats[i]=ary.get(i).getAsFloat();
}
return floats;
}
public static float[][] customNumberArrayToFloatArrayBatch(JsonElement element) {
JsonObject o1 = element.getAsJsonObject();
JsonElement data = o1.get("data");
JsonArray dary = data.getAsJsonArray();
float[][] floats2dary = new float[dary.size()][];
for (int vector_idx = 0; vector_idx < dary.size(); vector_idx++) {
JsonElement element0 = dary.get(vector_idx);
JsonObject eobj1 = element0.getAsJsonObject();
JsonElement embedding = eobj1.get("embedding");
JsonArray vectorAry = embedding.getAsJsonArray();
float[] newV = new float[vectorAry.size()];
for (int component_idx = 0; component_idx < vectorAry.size(); component_idx++) {
newV[component_idx]=vectorAry.get(component_idx).getAsFloat();
}
floats2dary[vector_idx]=newV;
}
return floats2dary;
}
}

View File

@ -128,7 +128,7 @@ public class HttpOpDispenser extends BaseOpDispenser<HttpOp, HttpSpace> {
}
@Override
public HttpOp apply(long value) {
public HttpOp getOp(long value) {
HttpOp op = this.opFunc.apply(value);
return op;

View File

@ -48,7 +48,7 @@ public class JDBCDDLOpDispenser extends JDBCBaseOpDispenser {
}
}
@Override
public JDBCDDLOp apply(long cycle) {
public JDBCDDLOp getOp(long cycle) {
String ddlSqlStr = ddlSqlStrFunc.apply(cycle);
return new JDBCDDLOp(jdbcSpace, ddlSqlStr);
}

View File

@ -101,7 +101,7 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
}
@Override
public JDBCDMLOp apply(long cycle) {
public JDBCDMLOp getOp(long cycle) {
if (isReadStatement) {
return new JDBCDMLReadOp(
jdbcSpace,

View File

@ -155,7 +155,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
}
@Override
public KafkaOp apply(final long cycle) {
public KafkaOp getOp(final long cycle) {
final List<String> topicNameList = this.getEffectiveTopicNameList(cycle);
final String groupId = this.getEffectiveGroupId(cycle);
if ((0 == topicNameList.size()) || StringUtils.isBlank(groupId)) throw new KafkaAdapterInvalidParamException(

View File

@ -200,7 +200,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
}
@Override
public KafkaOp apply(final long cycle) {
public KafkaOp getOp(final long cycle) {
final String topicName = this.topicNameStrFunc.apply(cycle);
final String clientId = this.getEffectiveClientId(cycle);

View File

@ -47,6 +47,11 @@
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.24.0</version>
</dependency>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>

View File

@ -53,7 +53,7 @@ public class MilvusOpMapper implements OpMapper<MilvusBaseOp<?>> {
"type",
"target"
);
logger.info(() -> "Using '" + typeAndTarget.enumId + "' statement form for '" + op.getName() + "'");
logger.info(() -> "Using '" + typeAndTarget.enumId + "' op type for op template '" + op.getName() + "'");
return switch (typeAndTarget.enumId) {
case drop_collection -> new MilvusDropCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);

View File

@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.exceptions;
import io.milvus.grpc.GetLoadStateResponse;
import io.milvus.param.R;
import java.time.Duration;
public class MilvusAwaitStateIncompleteError extends RuntimeException {
private final R<GetLoadStateResponse> loadState;
private final Duration timeout;
private final String timeSummary;
public MilvusAwaitStateIncompleteError(R<GetLoadStateResponse> loadState, Duration timeout, String timeSummary) {
this.loadState = loadState;
this.timeout = timeout;
this.timeSummary = timeSummary;
}
@Override
public String getMessage() {
return super.getMessage() + ": at time " +timeSummary;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.exceptions;
import io.milvus.param.index.DescribeIndexParam;
import io.nosqlbench.adapter.milvus.ops.MilvusDescribeIndexOp;
import java.util.List;
public class MilvusIndexingIncompleteError extends RuntimeException {
private final DescribeIndexParam milvusDescribeIndexOp;
private final int tried;
private final List<MilvusDescribeIndexOp.IndexStat> stats;
public MilvusIndexingIncompleteError(DescribeIndexParam milvusDescribeIndexOp, int tried, List<MilvusDescribeIndexOp.IndexStat> stats) {
this.milvusDescribeIndexOp = milvusDescribeIndexOp;
this.tried = tried;
this.stats = stats;
}
@Override
public String getMessage() {
return super.getMessage() + ": "
+ "tries:" + tried + "/" + tried
+ ", index:" + milvusDescribeIndexOp.getIndexName()
+ ", database:" + milvusDescribeIndexOp.getDatabaseName()
+ ", collection:" + milvusDescribeIndexOp.getCollectionName();
}
}

View File

@ -42,7 +42,8 @@ public class MilvusAlterCollectionOpDispenser extends MilvusBaseOpDispenser<Alte
LongFunction<AlterCollectionParam.Builder> ebF =
l -> AlterCollectionParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFuncOptionally(ebF,"ttl", Integer.class, AlterCollectionParam.Builder::withTTL);
ebF = op.enhanceFuncOptionally(ebF, "ttl", Number.class,
(AlterCollectionParam.Builder b, Number n) -> b.withTTL(n.intValue()));
final LongFunction<AlterCollectionParam.Builder> lastF = ebF;
final LongFunction<AlterCollectionParam> collectionParamF = l -> lastF.apply(l).build();
@ -56,6 +57,6 @@ public class MilvusAlterCollectionOpDispenser extends MilvusBaseOpDispenser<Alte
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusAlterCollectionOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusAlterCollectionOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -58,7 +58,7 @@ public abstract class MilvusBaseOpDispenser<T> extends BaseOpDispenser<MilvusBas
);
@Override
public MilvusBaseOp<T> apply(long value) {
public MilvusBaseOp<T> getOp(long value) {
return opF.apply(value);
}
}

View File

@ -39,12 +39,9 @@ public class MilvusCreateCollectionOpDispenser extends MilvusBaseOpDispenser<Cre
/**
* Create a new MilvusCreateCollectionOpDispenser subclassed from {@link MilvusBaseOpDispenser}.
*
* @param adapter
* The associated {@link MilvusDriverAdapter}
* @param op
* The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction
* A LongFunction that returns the specified Milvus Index for this Op
* @param adapter The associated {@link MilvusDriverAdapter}
* @param op The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction A LongFunction that returns the specified Milvus Index for this Op
*/
public MilvusCreateCollectionOpDispenser(MilvusDriverAdapter adapter,
ParsedOp op,
@ -61,10 +58,10 @@ public class MilvusCreateCollectionOpDispenser extends MilvusBaseOpDispenser<Cre
LongFunction<CreateCollectionParam.Builder> ebF =
l -> CreateCollectionParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFuncOptionally(ebF, "shards_num", Integer.class,
CreateCollectionParam.Builder::withShardsNum);
ebF = op.enhanceFuncOptionally(ebF, "partition_num", Integer.class,
CreateCollectionParam.Builder::withPartitionsNum);
ebF = op.enhanceFuncOptionally(ebF, "shards_num", Number.class,
(CreateCollectionParam.Builder b, Number n) -> b.withShardsNum(n.intValue()));
ebF = op.enhanceFuncOptionally(ebF, "partition_num", Number.class,
(CreateCollectionParam.Builder b, Number n) -> b.withPartitionsNum(n.intValue()));
ebF = op.enhanceFuncOptionally(ebF, "description", String.class,
CreateCollectionParam.Builder::withDescription);
ebF = op.enhanceEnumOptionally(ebF, "consistency_level",
@ -97,8 +94,7 @@ public class MilvusCreateCollectionOpDispenser extends MilvusBaseOpDispenser<Cre
/**
* Function to build the {@link FieldType}s for the {@link CreateCollectionParam}.
*
* @param fieldTypesData
* The static map of config data from the create collection request
* @param fieldTypesData The static map of config data from the create collection request
* @param ebF
* @return a list of static field types
*/
@ -112,22 +108,22 @@ public class MilvusCreateCollectionOpDispenser extends MilvusBaseOpDispenser<Cre
.ifPresent(builder::withPrimaryKey);
fieldspec.getOptionalStaticValue("auto_id", Boolean.class)
.ifPresent(builder::withAutoID);
fieldspec.getOptionalStaticConfig("max_length", Integer.class)
.ifPresent(builder::withMaxLength);
fieldspec.getOptionalStaticConfig("max_capacity", Integer.class)
.ifPresent(builder::withMaxCapacity);
fieldspec.getOptionalStaticValue(List.of("partition_key","partition"), Boolean.class)
fieldspec.getOptionalStaticConfig("max_length", Number.class)
.ifPresent((Number n) -> builder.withMaxLength(n.intValue()));
fieldspec.getOptionalStaticConfig("max_capacity", Number.class)
.ifPresent((Number n) -> builder.withMaxCapacity(n.intValue()));
fieldspec.getOptionalStaticValue(List.of("partition_key", "partition"), Boolean.class)
.ifPresent(builder::withPartitionKey);
fieldspec.getOptionalStaticValue("dimension", Integer.class)
.ifPresent(builder::withDimension);
fieldspec.getOptionalStaticValue("dimension", Number.class)
.ifPresent((Number n) -> builder.withDimension(n.intValue()));
fieldspec.getOptionalStaticConfig("data_type", String.class)
.map(DataType::valueOf)
.ifPresent(builder::withDataType);
fieldspec.getOptionalStaticConfig("type_params", Map.class)
.ifPresent(builder::withTypeParams);
fieldspec.getOptionalStaticConfig("element_type",String.class)
.map(DataType::valueOf)
.ifPresent(builder::withElementType);
fieldspec.getOptionalStaticConfig("element_type", String.class)
.map(DataType::valueOf)
.ifPresent(builder::withElementType);
fieldTypes.add(builder.build());
});

View File

@ -36,12 +36,9 @@ public class MilvusCreateIndexOpDispenser extends MilvusBaseOpDispenser<CreateIn
/**
* Create a new MilvusCreateIndexOpDispenser subclassed from {@link MilvusBaseOpDispenser}.
*
* @param adapter
* The associated {@link MilvusDriverAdapter}
* @param op
* The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction
* A LongFunction that returns the specified Milvus Index for this Op
* @param adapter The associated {@link MilvusDriverAdapter}
* @param op The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction A LongFunction that returns the specified Milvus Index for this Op
*/
public MilvusCreateIndexOpDispenser(
MilvusDriverAdapter adapter,
@ -56,23 +53,28 @@ public class MilvusCreateIndexOpDispenser extends MilvusBaseOpDispenser<CreateIn
LongFunction<CreateIndexParam.Builder> bF =
l -> CreateIndexParam.newBuilder().withIndexName(targetF.apply(l));
bF = op.enhanceFunc(bF, List.of("collection","collection_name"), String.class,
bF = op.enhanceFunc(bF, List.of("collection", "collection_name"), String.class,
CreateIndexParam.Builder::withCollectionName);
bF = op.enhanceFunc(bF, "field_name", String.class, CreateIndexParam.Builder::withFieldName);
bF = op.enhanceEnumOptionally(bF, "index_type", IndexType.class, CreateIndexParam.Builder::withIndexType);
bF = op.enhanceEnumOptionally(bF, "metric_type", MetricType.class, CreateIndexParam.Builder::withMetricType);
bF = op.enhanceFuncOptionally(bF, "extra_param", String.class, CreateIndexParam.Builder::withExtraParam);
bF = op.enhanceFuncOptionally(bF, "sync_mode", Boolean.class, CreateIndexParam.Builder::withSyncMode);
bF = op.enhanceFuncOptionally(bF, "sync_waiting_interval", Long.class, CreateIndexParam.Builder::withSyncWaitingInterval);
bF = op.enhanceFuncOptionally(bF, "sync_waiting_timeout", Long.class, CreateIndexParam.Builder::withSyncWaitingTimeout);
bF = op.enhanceFuncOptionally(bF, List.of("database","database_name"), String.class,
bF = op.enhanceFuncOptionally(bF, "sync_waiting_interval", Number.class,
(CreateIndexParam.Builder b, Number n) -> b.withSyncWaitingInterval(n.longValue()));
bF = op.enhanceFuncOptionally(bF, "sync_waiting_timeout", Number.class,
(CreateIndexParam.Builder b, Number n) -> b.withSyncWaitingTimeout(n.longValue()));
bF = op.enhanceFuncOptionally(bF, List.of("database", "database_name"), String.class,
CreateIndexParam.Builder::withDatabaseName);
LongFunction<CreateIndexParam.Builder> finalBF1 = bF;
return l -> finalBF1.apply(l).build();
}
@Override
public LongFunction<MilvusBaseOp<CreateIndexParam>> createOpFunc(LongFunction<CreateIndexParam> paramF, LongFunction<MilvusServiceClient> clientF, ParsedOp op, LongFunction<String> targetF) {
public LongFunction<MilvusBaseOp<CreateIndexParam>> createOpFunc(
LongFunction<CreateIndexParam> paramF,
LongFunction<MilvusServiceClient> clientF,
ParsedOp op, LongFunction<String> targetF) {
return l -> new MilvusCreateIndexOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -18,21 +18,31 @@ package io.nosqlbench.adapter.milvus.opdispensers;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.index.DescribeIndexParam;
import io.milvus.param.partition.CreatePartitionParam;
import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
import io.nosqlbench.adapter.milvus.ops.MilvusBaseOp;
import io.nosqlbench.adapter.milvus.ops.MilvusDescribeIndexOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.function.LongFunction;
public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<DescribeIndexParam> {
private Duration awaitTimeout = Duration.ZERO;
private Duration awaitInterval = Duration.of(10, ChronoUnit.SECONDS);
public MilvusDescribeIndexOpDispenser(MilvusDriverAdapter adapter,
ParsedOp op,
LongFunction<String> targetFunction) {
super(adapter, op, targetFunction);
op.getOptionalStaticValue("await_timeout", Number.class)
.map(Number::doubleValue)
.ifPresent(v->this.awaitTimeout = Duration.of((long)(v*1000), ChronoUnit.MILLIS));
op.getOptionalStaticValue("await_interval", Number.class)
.map(Number::doubleValue).ifPresent(v->this.awaitInterval =Duration.of((long)(v*1000),ChronoUnit.MILLIS));
}
@Override
@ -43,11 +53,12 @@ public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<Descri
) {
LongFunction<DescribeIndexParam.Builder> ebF =
l -> DescribeIndexParam.newBuilder().withIndexName(targetF.apply(l));
ebF = op.enhanceFunc(ebF, List.of("collection","collection_name"),String.class,
ebF = op.enhanceFunc(ebF, List.of("collection","collection_name"), String.class,
DescribeIndexParam.Builder::withCollectionName);
ebF = op.enhanceFunc(ebF,List.of("database_name","database"),String.class,
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name","database"), String.class,
DescribeIndexParam.Builder::withDatabaseName);
final LongFunction<DescribeIndexParam.Builder> lastF = ebF;
final LongFunction<DescribeIndexParam> collectionParamF = l -> lastF.apply(l).build();
return collectionParamF;
@ -60,6 +71,11 @@ public class MilvusDescribeIndexOpDispenser extends MilvusBaseOpDispenser<Descri
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusDescribeIndexOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusDescribeIndexOp(
clientF.apply(l),
paramF.apply(l),
awaitTimeout,
awaitInterval
);
}
}

View File

@ -51,12 +51,12 @@ public class MilvusFlushOpDispenser extends MilvusBaseOpDispenser<FlushParam> {
};
LongFunction<FlushParam.Builder> finalEbF = ebF;
ebF = l -> finalEbF.apply(l).withCollectionNames(cnames.apply(l));
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name","database"),String.class,
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name", "database"), String.class,
FlushParam.Builder::withDatabaseName);
ebF = op.enhanceFuncOptionally(ebF, "sync_flush_waiting_interval",Long.class,
FlushParam.Builder::withSyncFlushWaitingInterval);
ebF = op.enhanceFuncOptionally(ebF, "sync_flush_waiting_timeout",Long.class,
FlushParam.Builder::withSyncFlushWaitingTimeout);
ebF = op.enhanceFuncOptionally(ebF, "sync_flush_waiting_interval", Number.class,
(FlushParam.Builder b, Number n) -> b.withSyncFlushWaitingInterval(n.longValue()));
ebF = op.enhanceFuncOptionally(ebF, "sync_flush_waiting_timeout", Number.class,
(FlushParam.Builder b, Number n) -> b.withSyncFlushWaitingTimeout(n.longValue()));
final LongFunction<FlushParam.Builder> lastF = ebF;
final LongFunction<FlushParam> collectionParamF = l -> lastF.apply(l).build();
@ -70,6 +70,6 @@ public class MilvusFlushOpDispenser extends MilvusBaseOpDispenser<FlushParam> {
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusFlushOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusFlushOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -52,9 +52,10 @@ public class MilvusGetFlushStateOpDispenser extends MilvusBaseOpDispenser<GetFlu
};
LongFunction<GetFlushStateParam.Builder> finalEbF = ebF;
ebF = l -> finalEbF.apply(l).withSegmentIDs(idsF.apply(l));
ebF = op.enhanceFuncOptionally(ebF,List.of("collection","collection_name"),String.class,
ebF = op.enhanceFuncOptionally(ebF, List.of("collection", "collection_name"), String.class,
GetFlushStateParam.Builder::withCollectionName);
ebF = op.enhanceFuncOptionally(ebF,"flush_ts",Long.class,GetFlushStateParam.Builder::withFlushTs);
ebF = op.enhanceFuncOptionally(ebF, "flush_ts", Number.class,
(GetFlushStateParam.Builder b, Number n) -> b.withFlushTs(n.longValue()));
final LongFunction<GetFlushStateParam.Builder> lastF = ebF;
final LongFunction<GetFlushStateParam> collectionParamF = l -> lastF.apply(l).build();
@ -68,6 +69,6 @@ public class MilvusGetFlushStateOpDispenser extends MilvusBaseOpDispenser<GetFlu
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusGetFlushStateOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusGetFlushStateOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -17,23 +17,49 @@
package io.nosqlbench.adapter.milvus.opdispensers;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.LoadState;
import io.milvus.param.collection.GetLoadStateParam;
import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
import io.nosqlbench.adapter.milvus.MilvusAdapterUtils;
import io.nosqlbench.adapter.milvus.ops.MilvusBaseOp;
import io.nosqlbench.adapter.milvus.ops.MilvusGetLoadStateOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.errors.OpConfigError;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import java.util.function.LongFunction;
public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoadStateParam> {
private Duration awaitTimeout = Duration.ZERO;
private Duration awaitInterval = Duration.of(10, ChronoUnit.SECONDS);
private LoadState awaitState = LoadState.UNRECOGNIZED;
public MilvusGetLoadStateOpDispenser(MilvusDriverAdapter adapter,
ParsedOp op,
LongFunction<String> targetFunction) {
super(adapter, op, targetFunction);
op.getOptionalStaticValue("await_timeout", Number.class)
.map(Number::doubleValue)
.ifPresent(v->this.awaitTimeout=Duration.of((long)(v*1000),ChronoUnit.MILLIS));
op.getOptionalStaticValue("await_interval", Number.class)
.map(Number::doubleValue).ifPresent(v->this.awaitInterval=Duration.of((long)(v*1000),ChronoUnit.MILLIS));
op.getOptionalStaticValue("await_state", String.class).ifPresent(s -> {
var spec = s.toLowerCase();
for (LoadState value : LoadState.values()) {
if (value.name().toLowerCase().equals(spec) || value.name().toLowerCase().equals("loadstate" + spec)) {
this.awaitState = value;
break;
}
}
if (this.awaitState == null) {
throw new OpConfigError("Unrecognizable load state to await: " + spec);
}
});
}
@Override
@ -44,7 +70,7 @@ public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoad
) {
LongFunction<GetLoadStateParam.Builder> ebF =
l -> GetLoadStateParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name","database"),String.class,
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name", "database"), String.class,
GetLoadStateParam.Builder::withDatabaseName);
Optional<LongFunction<String>> partitionsF = op.getAsOptionalFunction("partition_name", String.class);
@ -54,6 +80,8 @@ public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoad
ebF = l -> finalEbF.apply(l).withPartitionNames(MilvusAdapterUtils.splitNames(pfunc.apply(l)));
}
final LongFunction<GetLoadStateParam.Builder> lastF = ebF;
return l -> lastF.apply(l).build();
}
@ -65,6 +93,12 @@ public class MilvusGetLoadStateOpDispenser extends MilvusBaseOpDispenser<GetLoad
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusGetLoadStateOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusGetLoadStateOp(
clientF.apply(l),
paramF.apply(l),
this.awaitState,
this.awaitTimeout,
this.awaitInterval
);
}
}

View File

@ -41,7 +41,8 @@ public class MilvusListBulkInsertTasksOpDispenser extends MilvusBaseOpDispenser<
) {
LongFunction<ListBulkInsertTasksParam.Builder> ebF =
l -> ListBulkInsertTasksParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFuncOptionally(ebF,"limit",Integer.class,ListBulkInsertTasksParam.Builder::withLimit);
ebF = op.enhanceFuncOptionally(ebF, "limit", Number.class,
(ListBulkInsertTasksParam.Builder b, Number n) -> b.withLimit(n.intValue()));
final LongFunction<ListBulkInsertTasksParam.Builder> lastF = ebF;
final LongFunction<ListBulkInsertTasksParam> collectionParamF = l -> lastF.apply(l).build();
@ -55,6 +56,6 @@ public class MilvusListBulkInsertTasksOpDispenser extends MilvusBaseOpDispenser<
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusListBulkInsertTasksOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusListBulkInsertTasksOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -43,16 +43,18 @@ public class MilvusLoadCollectionOpDispenser extends MilvusBaseOpDispenser<LoadC
LongFunction<LoadCollectionParam.Builder> ebF =
l -> LoadCollectionParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFuncOptionally(ebF,List.of("database_name","database"),String.class,
ebF = op.enhanceFuncOptionally(ebF, List.of("database_name", "database"), String.class,
LoadCollectionParam.Builder::withDatabaseName);
ebF = op.enhanceFuncOptionally(ebF,"refresh",Boolean.class,LoadCollectionParam.Builder::withRefresh);
ebF = op.enhanceFuncOptionally(ebF,"sync_load",Boolean.class,LoadCollectionParam.Builder::withSyncLoad);
ebF = op.enhanceFuncOptionally(ebF,"replica_number",Integer.class,LoadCollectionParam.Builder::withReplicaNumber);
ebF = op.enhanceFuncOptionally(ebF,"resource_groups", List.class,LoadCollectionParam.Builder::withResourceGroups);
ebF = op.enhanceFuncOptionally(ebF,"sync_load_waiting_interval",Long.class,LoadCollectionParam.Builder::withSyncLoadWaitingInterval);
ebF = op.enhanceFuncOptionally(ebF,"sync_load_waiting_timeout",Long.class,
LoadCollectionParam.Builder::withSyncLoadWaitingTimeout);
ebF = op.enhanceFuncOptionally(ebF, "refresh", Boolean.class, LoadCollectionParam.Builder::withRefresh);
ebF = op.enhanceFuncOptionally(ebF, "sync_load", Boolean.class, LoadCollectionParam.Builder::withSyncLoad);
ebF = op.enhanceFuncOptionally(ebF, "replica_number", Number.class,
(LoadCollectionParam.Builder b, Number n) -> b.withReplicaNumber(n.intValue()));
ebF = op.enhanceFuncOptionally(ebF, "resource_groups", List.class, LoadCollectionParam.Builder::withResourceGroups);
ebF = op.enhanceFuncOptionally(ebF, "sync_load_waiting_interval", Number.class,
(LoadCollectionParam.Builder b, Number n) -> b.withSyncLoadWaitingInterval(n.longValue()));
ebF = op.enhanceFuncOptionally(ebF, "sync_load_waiting_timeout", Number.class,
(LoadCollectionParam.Builder b, Number n) -> b.withSyncLoadWaitingTimeout(n.longValue()));
final LongFunction<LoadCollectionParam.Builder> lastF = ebF;
final LongFunction<LoadCollectionParam> collectionParamF = l -> lastF.apply(l).build();
@ -66,6 +68,6 @@ public class MilvusLoadCollectionOpDispenser extends MilvusBaseOpDispenser<LoadC
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusLoadCollectionOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusLoadCollectionOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -50,22 +50,24 @@ public class MilvusLoadPartitionsOpDispenser extends MilvusBaseOpDispenser<LoadP
LongFunction<LoadPartitionsParam.Builder> ebF =
l -> LoadPartitionsParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFunc(ebF, List.of("partition_names","partitions"), List.class,
ebF = op.enhanceFunc(ebF, List.of("partition_names", "partitions"), List.class,
LoadPartitionsParam.Builder::withPartitionNames);
ebF = op.enhanceFuncOptionally(
ebF, "resource_groups", List.class,
LoadPartitionsParam.Builder::withResourceGroups
);
ebF = op.enhanceFuncOptionally(
ebF, List.of("database_name","database"), String.class,
ebF, List.of("database_name", "database"), String.class,
LoadPartitionsParam.Builder::withDatabaseName
);
ebF = op.enhanceFuncOptionally(ebF, "refresh", Boolean.class, LoadPartitionsParam.Builder::withRefresh);
ebF = op.enhanceFuncOptionally(ebF, "replica_number", Integer.class, LoadPartitionsParam.Builder::withReplicaNumber);
ebF = op.enhanceFuncOptionally(ebF,"sync_load",Boolean.class,LoadPartitionsParam.Builder::withSyncLoad);
ebF = op.enhanceFuncOptionally(ebF,"sync_load_waiting_interval",Long.class,LoadPartitionsParam.Builder::withSyncLoadWaitingInterval);
ebF = op.enhanceFuncOptionally(ebF,"sync_load_waiting_timeout",Long.class,
LoadPartitionsParam.Builder::withSyncLoadWaitingTimeout);
ebF = op.enhanceFuncOptionally(ebF, "replica_number", Number.class,
(LoadPartitionsParam.Builder b, Number n) -> b.withReplicaNumber(n.intValue()));
ebF = op.enhanceFuncOptionally(ebF, "sync_load", Boolean.class, LoadPartitionsParam.Builder::withSyncLoad);
ebF = op.enhanceFuncOptionally(ebF, "sync_load_waiting_interval", Number.class,
(LoadPartitionsParam.Builder b, Number n) -> b.withSyncLoadWaitingInterval(n.longValue()));
ebF = op.enhanceFuncOptionally(ebF, "sync_load_waiting_timeout", Number.class,
(LoadPartitionsParam.Builder b, Number n) -> b.withSyncLoadWaitingTimeout(n.longValue()));
final LongFunction<LoadPartitionsParam.Builder> lastF = ebF;
final LongFunction<LoadPartitionsParam> collectionParamF = l -> lastF.apply(l).build();

View File

@ -44,14 +44,14 @@ public class MilvusQueryOpDispenser extends MilvusBaseOpDispenser<QueryParam> {
LongFunction<QueryParam.Builder> ebF =
l -> QueryParam.newBuilder().withCollectionName(targetF.apply(l));
ebF = op.enhanceFuncOptionally(ebF,List.of("partition_names","partitions"), List.class,
ebF = op.enhanceFuncOptionally(ebF, List.of("partition_names", "partitions"), List.class,
QueryParam.Builder::withPartitionNames);
ebF = op.enhanceEnumOptionally(ebF,"consistency_level", ConsistencyLevelEnum.class, QueryParam.Builder::withConsistencyLevel);
ebF = op.enhanceFuncOptionally(ebF,"expr",String.class,QueryParam.Builder::withExpr);
ebF = op.enhanceFuncOptionally(ebF,"limit",Long.class,QueryParam.Builder::withLimit);
ebF = op.enhanceFuncOptionally(ebF,"offset",Long.class,QueryParam.Builder::withOffset);
ebF = op.enhanceFuncOptionally(ebF,"ignore_growing",Boolean.class,QueryParam.Builder::withIgnoreGrowing);
ebF = op.enhanceFuncOptionally(ebF,"out_fields",List.class,QueryParam.Builder::withOutFields);
ebF = op.enhanceEnumOptionally(ebF, "consistency_level", ConsistencyLevelEnum.class, QueryParam.Builder::withConsistencyLevel);
ebF = op.enhanceFuncOptionally(ebF, "expr", String.class, QueryParam.Builder::withExpr);
ebF = op.enhanceFuncOptionally(ebF, "limit", Number.class, (QueryParam.Builder b, Number n) -> b.withLimit(n.longValue()));
ebF = op.enhanceFuncOptionally(ebF, "offset", Number.class, (QueryParam.Builder b, Number n) -> b.withOffset(n.longValue()));
ebF = op.enhanceFuncOptionally(ebF, "ignore_growing", Boolean.class, QueryParam.Builder::withIgnoreGrowing);
ebF = op.enhanceFuncOptionally(ebF, "out_fields", List.class, QueryParam.Builder::withOutFields);
final LongFunction<QueryParam.Builder> lastF = ebF;
final LongFunction<QueryParam> collectionParamF = l -> lastF.apply(l).build();
@ -65,6 +65,6 @@ public class MilvusQueryOpDispenser extends MilvusBaseOpDispenser<QueryParam> {
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new MilvusQueryOp(clientF.apply(l),paramF.apply(l));
return l -> new MilvusQueryOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -24,12 +24,8 @@ import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
import io.nosqlbench.adapter.milvus.ops.MilvusBaseOp;
import io.nosqlbench.adapter.milvus.ops.MilvusSearchOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.LongFunction;
public class MilvusSearchOpDispenser extends MilvusBaseOpDispenser<SearchParam> {
@ -53,9 +49,11 @@ public class MilvusSearchOpDispenser extends MilvusBaseOpDispenser<SearchParam>
ebF = op.enhanceEnumOptionally(ebF, "consistency_level", ConsistencyLevelEnum.class, SearchParam.Builder::withConsistencyLevel);
ebF = op.enhanceFuncOptionally(ebF, "expr", String.class, SearchParam.Builder::withExpr);
ebF = op.enhanceDefaultFunc(ebF, "top_k", Integer.class, 100, SearchParam.Builder::withTopK);
ebF = op.enhanceDefaultFunc(ebF, "top_k", Number.class, 100,
(SearchParam.Builder b, Number n) -> b.withTopK(n.intValue()));
ebF = op.enhanceEnumOptionally(ebF, "metric_type", MetricType.class, SearchParam.Builder::withMetricType);
ebF = op.enhanceFuncOptionally(ebF, "round_decimal", Integer.class, SearchParam.Builder::withRoundDecimal);
ebF = op.enhanceFuncOptionally(ebF, "round_decimal", Number.class,
(SearchParam.Builder b, Number n) -> b.withRoundDecimal(n.intValue()));
ebF = op.enhanceFuncOptionally(ebF, "ignore_growing", Boolean.class, SearchParam.Builder::withIgnoreGrowing);
ebF = op.enhanceFuncOptionally(ebF, "params", String.class, SearchParam.Builder::withParams);
ebF = op.enhanceFunc(ebF, List.of("vector_field_name", "vector_field"), String.class,

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -50,6 +51,16 @@ public abstract class MilvusBaseOp<T> implements CycleOp<Object> {
try {
Object result = applyOp(value);
if (result instanceof R<?> r) {
var error = r.getException();
if (error!=null) {
throw error;
}
} else {
logger.warn("Op '" + this.toString() + "' did not return a Result 'R' type." +
" Exception handling will be bypassed"
);
}
return result;
} catch (Exception e) {
if (e instanceof RuntimeException rte) {

View File

@ -17,6 +17,8 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.CreateCollectionParam;
import io.nosqlbench.adapters.api.templating.ParsedOp;
@ -33,6 +35,7 @@ public class MilvusCreateCollectionOp extends MilvusBaseOp<CreateCollectionParam
@Override
public Object applyOp(long value) {
return client.createCollection(request);
R<RpcStatus> collection = client.createCollection(request);
return collection;
}
}

View File

@ -17,6 +17,8 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.CreateDatabaseParam;
public class MilvusCreateDatabaseOp extends MilvusBaseOp<CreateDatabaseParam> {
@ -26,6 +28,7 @@ public class MilvusCreateDatabaseOp extends MilvusBaseOp<CreateDatabaseParam> {
@Override
public Object applyOp(long value) {
return client.createDatabase(request);
R<RpcStatus> database = client.createDatabase(request);
return database;
}
}

View File

@ -17,6 +17,8 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DescribeCollectionResponse;
import io.milvus.param.R;
import io.milvus.param.collection.DescribeCollectionParam;
public class MilvusDescribeCollectionOp extends MilvusBaseOp<DescribeCollectionParam> {
@ -26,6 +28,7 @@ public class MilvusDescribeCollectionOp extends MilvusBaseOp<DescribeCollectionP
@Override
public Object applyOp(long value) {
return client.describeCollection(request);
R<DescribeCollectionResponse> describeCollectionResponseR = client.describeCollection(request);
return describeCollectionResponseR;
}
}

View File

@ -17,15 +17,88 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DescribeIndexResponse;
import io.milvus.grpc.IndexDescription;
import io.milvus.param.R;
import io.milvus.param.index.DescribeIndexParam;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.OpGenerator;
import io.nosqlbench.adapters.api.scheduling.TimeoutPredicate;
public class MilvusDescribeIndexOp extends MilvusBaseOp<DescribeIndexParam> {
public MilvusDescribeIndexOp(MilvusServiceClient client, DescribeIndexParam request) {
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public class MilvusDescribeIndexOp extends MilvusBaseOp<DescribeIndexParam> implements OpGenerator {
private final Duration timeout;
private final Duration interval;
private final TimeoutPredicate<Integer> timeoutPredicate;
private MilvusDescribeIndexOp nextOp;
private long lastAttemptAt = 0L;
public MilvusDescribeIndexOp(
MilvusServiceClient client,
DescribeIndexParam request,
Duration timeout,
Duration interval
) {
super(client, request);
this.timeout = timeout;
this.interval = interval;
this.timeoutPredicate = TimeoutPredicate.of(p -> p>=100, timeout, interval, true);
}
@Override
public Object applyOp(long value) {
return client.describeIndex(request);
nextOp = null;
timeoutPredicate.blockUntilNextInterval();
R<DescribeIndexResponse> describeIndexResponseR = client.describeIndex(request);
DescribeIndexResponse data = describeIndexResponseR.getData();
TimeoutPredicate.Result<Integer> result = timeoutPredicate.test(getIndexStats(data).percent());
String message = result.status().name() + " await state " + result.value() + " at time " + result.timeSummary();
logger.info(message);
if (result.isPending()) {
this.nextOp=this;
}
return describeIndexResponseR;
}
private IndexStats getIndexStats(DescribeIndexResponse data) {
var stats = new ArrayList<IndexStat>();
for (IndexDescription desc : data.getIndexDescriptionsList()) {
stats.add(new IndexStat(desc.getIndexName(), desc.getIndexedRows(), desc.getPendingIndexRows()));
}
return new IndexStats(stats);
}
public static class IndexStats extends ArrayList<IndexStat> {
public IndexStats(List<IndexStat> stats) {
super(stats);
}
public int percent() {
return stream().mapToInt(IndexStat::percent).min().orElse(0);
}
}
public record IndexStat(
String index_name,
long indexed_rows,
long pending_rows
) {
public int percent() {
if (pending_rows == 0) {
return 100;
}
return (int) (100.0d * ((double) indexed_rows / (double) (indexed_rows + pending_rows)));
}
}
@Override
public Op getNextOp() {
return nextOp;
}
}

View File

@ -16,7 +16,12 @@
package io.nosqlbench.adapter.milvus.ops;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.GetIndexBuildProgressResponse;
import io.milvus.param.R;
import io.milvus.param.index.GetIndexBuildProgressParam;
public class MilvusGetIndexBuildProgressOp extends MilvusBaseOp<GetIndexBuildProgressParam> {
@ -26,6 +31,13 @@ public class MilvusGetIndexBuildProgressOp extends MilvusBaseOp<GetIndexBuildPro
@Override
public Object applyOp(long value) {
return client.getIndexBuildProgress(request);
R<GetIndexBuildProgressResponse> indexBuildProgress = client.getIndexBuildProgress(request);
GetIndexBuildProgressResponse r = indexBuildProgress.getData();
try {
String responseJson = JsonFormat.printer().print(r);
return responseJson;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -17,15 +17,55 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.GetLoadStateResponse;
import io.milvus.grpc.LoadState;
import io.milvus.param.R;
import io.milvus.param.collection.GetLoadStateParam;
import io.nosqlbench.adapter.milvus.exceptions.MilvusAwaitStateIncompleteError;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.OpGenerator;
import io.nosqlbench.adapters.api.scheduling.TimeoutPredicate;
public class MilvusGetLoadStateOp extends MilvusBaseOp<GetLoadStateParam> {
public MilvusGetLoadStateOp(MilvusServiceClient client, GetLoadStateParam request) {
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.locks.LockSupport;
public class MilvusGetLoadStateOp extends MilvusBaseOp<GetLoadStateParam> implements OpGenerator {
private final TimeoutPredicate<LoadState> timeoutPredicate;
private int tried;
private MilvusGetLoadStateOp nextOp;
private long lastAttemptAt = 0L;
public MilvusGetLoadStateOp(
MilvusServiceClient client,
GetLoadStateParam request,
LoadState awaitState,
Duration timeout,
Duration interval
) {
super(client, request);
this.timeoutPredicate = TimeoutPredicate.of(s -> s==awaitState, timeout, interval, true);
}
@Override
public Object applyOp(long value) {
return client.getLoadState(request);
this.nextOp = null;
timeoutPredicate.blockUntilNextInterval();
R<GetLoadStateResponse> getLoadStateResponse = client.getLoadState(request);
TimeoutPredicate.Result<LoadState> result = timeoutPredicate.test(getLoadStateResponse.getData().getState());
String message = result.status().name() + " await state " + result.value() + " at time " + result.timeSummary();
logger.info(message);
if (result.status()== TimeoutPredicate.Status.pending) {
nextOp=this;
}
return getLoadStateResponse;
}
@Override
public Op getNextOp() {
return this.nextOp;
}
}

View File

@ -36,6 +36,7 @@ public class MilvusInsertOp extends MilvusBaseOp<InsertParam> {
@Override
public R<MutationResult> applyOp(long value) {
return client.insert(request);
R<MutationResult> insert = client.insert(request);
return insert;
}
}

View File

@ -17,6 +17,8 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.LoadCollectionParam;
public class MilvusLoadCollectionOp extends MilvusBaseOp<LoadCollectionParam> {
@ -26,6 +28,7 @@ public class MilvusLoadCollectionOp extends MilvusBaseOp<LoadCollectionParam> {
@Override
public Object applyOp(long value) {
return client.loadCollection(request);
R<RpcStatus> rpcStatusR = client.loadCollection(request);
return rpcStatusR;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.resultwrappers;
import io.milvus.grpc.GetIndexBuildProgressResponse;
public class MVGetIndexBuildProgressRespones {
private final GetIndexBuildProgressResponse r;
public MVGetIndexBuildProgressRespones(GetIndexBuildProgressResponse r) {
this.r = r;
}
public int getPercent() {
if (getTotalRows()==getIndexedRows()) {
return 100;
}
double ratio = (double) getIndexedRows() / (double) getTotalRows();
return (int) (ratio*100.0d);
}
public long getTotalRows() {
return r.getTotalRows();
}
public long getIndexedRows() {
return r.getIndexedRows();
}
}

View File

@ -60,7 +60,7 @@ public class MongoCommandOpDispenser extends BaseOpDispenser<Op, MongoSpace> {
}
@Override
public Op apply(long cycle) {
public Op getOp(long cycle) {
return mongoOpF.apply(cycle);
}
}

Some files were not shown because too many files have changed in this diff Show More