part1 of aws os

This commit is contained in:
Jonathan Shook 2024-02-06 23:56:04 -06:00
parent 150fb79f01
commit 582d22af69
8 changed files with 122 additions and 24 deletions

View File

@ -85,11 +85,17 @@
<artifactId>aws-crt-client</artifactId>
<version>2.20.109</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<version>2.23.18</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>2.23.19</version>
</dependency>
</dependencies>

View File

@ -21,16 +21,29 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
public class OpenSearchAdapter extends BaseDriverAdapter<Op,OpenSearchSpace> {
public OpenSearchAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels);
}
@Override
public Function<String, ? extends OpenSearchSpace> getSpaceInitializer(NBConfiguration cfg) {
return (String spaceName) -> new OpenSearchSpace(cfg);
}
@Override
public OpMapper<Op> getOpMapper() {
return new OpenSearchOpMapper(this);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(OpenSearchSpace.getConfigModel());
}
}

View File

@ -17,18 +17,27 @@
package io.nosqlbench.adapter.opensearch;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.profile.internal.BasicProfile;
import com.amazonaws.auth.profile.internal.ProfileKeyConstants;
import com.amazonaws.auth.profile.internal.ProfileStaticCredentialsProvider;
import io.nosqlbench.nb.api.config.standard.ConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.config.standard.Param;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.InfoResponse;
import org.opensearch.client.transport.aws.AwsSdk2Transport;
import org.opensearch.client.transport.aws.AwsSdk2TransportOptions;
import software.amazon.awssdk.auth.credentials.*;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
public class OpenSearchSpace {
import java.io.IOException;
import java.util.Map;
public class OpenSearchSpace implements AutoCloseable {
private final NBConfiguration cfg;
protected OpenSearchClient client;
@ -38,7 +47,7 @@ public class OpenSearchSpace {
}
public synchronized OpenSearchClient getClient() {
if (client!=null) {
if (client == null) {
client = createClient();
}
return client;
@ -49,12 +58,39 @@ public class OpenSearchSpace {
Region selectedRegion = Region.of(region);
String host = cfg.get("host");
// https://ecj69wlzmday6hvr2586.eu-central-1.aoss.amazonaws.com
SdkAsyncHttpClient client1 = AwsCrtAsyncHttpClient.builder().build();
AwsSdk2TransportOptions transportOptions = AwsSdk2TransportOptions.builder().build();
AwsSdk2Transport awsSdk2Transport = new AwsSdk2Transport(client1,host,selectedRegion,transportOptions);
ProfileCredentialsProvider creds = ProfileCredentialsProvider.builder()
.profileName("686157956141_ENG-TESTENG_AWS-ENG-PERF")
.build();
SdkAsyncHttpClient httpClient =
AwsCrtAsyncHttpClient.builder()
.build();
AwsSdk2TransportOptions transportOptions =
AwsSdk2TransportOptions.builder()
.setCredentials(creds)
.build();
AwsSdk2Transport awsSdk2Transport =
new AwsSdk2Transport(
httpClient,
host,
"es",
selectedRegion,
transportOptions
);
OpenSearchClient client = new OpenSearchClient(awsSdk2Transport);
try {
InfoResponse info = client.info();
System.out.println(info.version().distribution() + ": " + info.version().number());
} catch (IOException e) {
throw new RuntimeException(e);
}
return client;
// // Create a new domain, update its configuration, and delete it.
@ -69,10 +105,17 @@ public class OpenSearchSpace {
public static NBConfigModel getConfigModel() {
return ConfigModel.of(OpenSearchSpace.class)
.add(Param.required("region", String.class).setDescription("The region to connect to"))
.add(Param.required("host",String.class).setDescription("The Open Search API endpoint"))
.add(Param.required("host", String.class).setDescription("The Open Search API endpoint host"))
.asReadOnly();
}
@Override
public void close() throws Exception {
if (client != null) {
client.shutdown();
}
}
// /**
// * Waits for the domain to finish processing changes. New domains typically take 15-30 minutes
@ -109,4 +152,5 @@ public class OpenSearchSpace {
// System.out.println("Domain description: "+describeResponse.toString());
// }
}

View File

@ -19,6 +19,7 @@ package io.nosqlbench.adapter.opensearch.dispensers;
import io.nosqlbench.adapter.opensearch.OpenSearchAdapter;
import io.nosqlbench.adapter.opensearch.ops.CreateIndexOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.mapping.*;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
@ -58,22 +59,30 @@ public class CreateIndexOpDispenser extends BaseOpenSearchOpDispenser {
@Override
public LongFunction<CreateIndexOp> createOpFunc(LongFunction<OpenSearchClient> clientF, ParsedOp op) {
CreateIndexRequest.Builder eb = new CreateIndexRequest.Builder();
LongFunction<CreateIndexRequest.Builder> bfunc = l -> new CreateIndexRequest.Builder();
LongFunction<CreateIndexRequest.Builder> bfunc = l -> new CreateIndexRequest.Builder().index("testindex1");
bfunc = op.enhanceFunc(bfunc, "mappings", Map.class, this::resolveTypeMapping);
LongFunction<CreateIndexRequest.Builder> finalBfunc = bfunc;
return (long l) -> new CreateIndexOp(clientF.apply(l), finalBfunc.apply(l).build());
}
private CreateIndexRequest.Builder resolveTypeMapping(CreateIndexRequest.Builder eb, Map<?,?> mappings) {
TypeMapping.Builder builder = new TypeMapping.Builder()
.properties(Map.of(
"p1", new Property.Builder().knnVector(new KnnVectorProperty.Builder()
.dimension(23)
.method(b -> b.spaceType("sdf"))
.build()
).build()
))
// https://opensearch.org/docs/latest/search-plugins/knn/knn-index/
private CreateIndexRequest.Builder resolveTypeMapping(CreateIndexRequest.Builder eb, Map<?, ?> mappings) {
TypeMapping.Builder builder = new TypeMapping.Builder().properties(
Map.of(
"p1",
new Property.Builder().knnVector(new KnnVectorProperty.Builder()
.dimension(23)
.method(
new KnnVectorMethod.Builder()
.name("hnsw")
.engine("faiss")
.spaceType("l2")
.parameters(Map.of("ef_construction", JsonData.of(256),"m",JsonData.of(8)))
.build()
).build()
).build()
))
.indexField(new IndexField.Builder()
.enabled(true).build())
.fieldNames(new FieldNamesField.Builder()

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.opensearch.ops;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.CreateIndexResponse;
import java.io.IOException;
@ -32,7 +33,8 @@ public class CreateIndexOp extends BaseOpenSearchOp {
@Override
public Object apply(long value) {
try {
return client.indices().create(rq);
CreateIndexResponse response = client.indices().create(rq);
return response;
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -13,3 +13,4 @@ blocks:
index:
create_index: TEMPLATE(indexname,vectors_index)
mappings:
m1: v1

View File

@ -95,12 +95,12 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
dispenser.onStart(cycle);
try (Timer.Context ct = executeTimer.time()) {
if (op instanceof RunnableOp) {
((RunnableOp) op).run();
} else if (op instanceof CycleOp<?>) {
result = ((CycleOp<?>) op).apply(cycle);
} else if (op instanceof ChainingOp) {
result = ((ChainingOp) op).apply(result);
if (op instanceof RunnableOp runnableOp) {
runnableOp.run();
} else if (op instanceof CycleOp<?> cycleOp) {
result = cycleOp.apply(cycle);
} else if (op instanceof ChainingOp chainingOp) {
result = chainingOp.apply(result);
} else {
throw new RuntimeException("The op implementation did not implement any active logic. Implement " +
"one of [RunnableOp, CycleOp, or ChainingOp]");

View File

@ -0,0 +1,23 @@
package io.nosqlbench.adapters.api.templating;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public interface TriFunction <A,B,C> {
public A apply(A a,B b,C c);
}