Merge branch 'main' into jshook/update-paging-fix

This commit is contained in:
Jonathan Shook 2023-08-30 01:11:53 -05:00
commit 8062e5b3ac
13 changed files with 391 additions and 33 deletions

View File

@ -20,6 +20,25 @@ import java.util.Arrays;
public class Intersections {
public static long count(long[] reference, long[] sample) {
int a_index = 0, b_index = 0, matches = 0;
long a_element, b_element;
while (a_index < reference.length && b_index < sample.length) {
a_element = reference[a_index];
b_element = sample[b_index];
if (a_element == b_element) {
++matches;
a_index++;
b_index++;
} else if (b_element < a_element) {
b_index++;
} else {
a_index++;
}
}
return matches;
}
public static long[] find(long[] reference, long[] sample) {
long[] result = new long[reference.length];
int a_index = 0, b_index = 0, acc_index = -1;
@ -62,6 +81,25 @@ public class Intersections {
return Arrays.copyOfRange(result,0,acc_index+1);
}
public static int count(int[] reference, int[] sample) {
int a_index = 0, b_index = 0, matches = 0;
int a_element, b_element;
while (a_index < reference.length && b_index < sample.length) {
a_element = reference[a_index];
b_element = sample[b_index];
if (a_element == b_element) {
++matches;
a_index++;
b_index++;
} else if (b_element < a_element) {
b_index++;
} else {
a_index++;
}
}
return matches;
}
public static int[] resize(int[] arr) {
int len = arr.length;
int[] copy = new int[len + 1];

View File

@ -33,4 +33,15 @@ class IntersectionsTest {
assertThat(result).isEqualTo(new long[]{4,5});
}
@Test
public void testCountIntIntersection() {
long result = Intersections.count(new int[]{1,3,5,7,9}, new int[]{1,2,3,9,10});
assertThat(result).isEqualTo(3L);
}
@Test
public void testCountLongIntersection() {
long result = Intersections.count(new long[]{1,3,5,7,9}, new long[]{1,2,3,9,10});
assertThat(result).isEqualTo(3);
}
}

View File

@ -233,6 +233,9 @@ public class HttpConsoleFormats {
if (contentType.toLowerCase().startsWith("text")) {
return true;
}
if (contentType.toLowerCase().startsWith("application/json")) {
return true;
}
return PRINTABLE.contains(contentType.split("/")[0].toLowerCase());
}

View File

@ -0,0 +1,83 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.extensions.vectormath;
import java.util.Arrays;
public class PineconeIntersections {
public static long[] find(long[] reference, long[] sample) {
long[] result = new long[reference.length];
int a_index = 0, b_index = 0, acc_index = -1;
long a_element, b_element;
while (a_index < reference.length && b_index < sample.length) {
a_element = reference[a_index];
b_element = sample[b_index];
if (a_element == b_element) {
result = resize(result);
result[++acc_index] = a_element;
a_index++;
b_index++;
} else if (b_element < a_element) {
b_index++;
} else {
a_index++;
}
}
return Arrays.copyOfRange(result,0,acc_index+1);
}
public static int[] find(int[] reference, int[] sample) {
int[] result = new int[reference.length];
int a_index = 0, b_index = 0, acc_index = -1;
int a_element, b_element;
while (a_index < reference.length && b_index < sample.length) {
a_element = reference[a_index];
b_element = sample[b_index];
if (a_element == b_element) {
result = resize(result);
result[++acc_index] = a_element;
a_index++;
b_index++;
} else if (b_element < a_element) {
b_index++;
} else {
a_index++;
}
}
return Arrays.copyOfRange(result,0,acc_index+1);
}
public static int[] resize(int[] arr) {
int len = arr.length;
int[] copy = new int[len + 1];
for (int i = 0; i < len; i++) {
copy[i] = arr[i];
}
return copy;
}
public static long[] resize(long[] arr) {
int len = arr.length;
long[] copy = new long[len + 1];
for (int i = 0; i < len; i++) {
copy[i] = arr[i];
}
return copy;
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.extensions.vectormath;
import io.pinecone.proto.QueryResponse;
import io.pinecone.proto.ScoredVector;
import java.util.Arrays;
public class PineconeVectorMath {
public static long[] stringArrayAsALongArray(String[] strings) {
long[] longs = new long[strings.length];
for (int i = 0; i < longs.length; i++) {
longs[i]=Long.parseLong(strings[i]);
}
return longs;
}
public static int[] stringArrayAsIntArray(String[] strings) {
int[] ints = new int[strings.length];
for (int i = 0; i < ints.length; i++) {
ints[i]=Integer.parseInt(strings[i]);
}
return ints;
}
public static String[] idsToStringArray(QueryResponse response) {
return response.getMatchesList().stream().map(ScoredVector::getId).toArray(String[]::new);
}
public static int[] idsToIntArray(QueryResponse response) {
return response.getMatchesList().stream().mapToInt(r -> Integer.parseInt(r.getId())).toArray();
}
public static double computeRecall(long[] referenceIndexes, long[] sampleIndexes) {
Arrays.sort(referenceIndexes);
Arrays.sort(sampleIndexes);
long[] intersection = PineconeIntersections.find(referenceIndexes,sampleIndexes);
return (double)intersection.length/(double)referenceIndexes.length;
}
public static double computeRecall(int[] referenceIndexes, int[] sampleIndexes) {
Arrays.sort(referenceIndexes);
Arrays.sort(sampleIndexes);
int[] intersection = PineconeIntersections.find(referenceIndexes,sampleIndexes);
return (double)intersection.length/(double)referenceIndexes.length;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.extensions.vectormath;
import com.codahale.metrics.MetricRegistry;
import io.nosqlbench.api.config.LabeledScenarioContext;
import io.nosqlbench.api.extensions.ScriptingPluginInfo;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.Logger;
@Service(value = ScriptingPluginInfo.class,selector = "pinecone_vectormath")
public class PineconeVectorMathPluginInfo implements ScriptingPluginInfo<PineconeVectorMath> {
@Override
public String getDescription() {
return "various methods and utilities for working with vector math in a scripted environment";
}
@Override
public PineconeVectorMath getExtensionObject(Logger logger, MetricRegistry metricRegistry, LabeledScenarioContext scriptContext) {
return new PineconeVectorMath();
}
}

View File

@ -1,36 +1,33 @@
scenarios:
default:
mixed: run driver=pinecone cycles=1000 apiKey=2f55b2f0-670f-4c51-9073-4d37142b761a projectName=a850334 environment=us-east-1-aws tags='block:main-.*'
verify: run driver=pinecone cycles=10 threads=10 apiKey=6503c344-5967-421d-b19a-3e7955842253 projectName=f88a480 environment=eu-west4-gcp tags=block:verify
bindings:
id: Mod(<<keycount:1000000000>>); ToString() -> String
vector_value: CircleVectors(100000, "io.nosqlbench.virtdata.library.basics.shared.vectors.algorithms.GoldenAngle")
state: StateCodes()
vector: HdfFileToFloatList("glove-25-angular.hdf5", "/test")
validation_set: HdfFileToIntArray("glove-25-angular.hdf5", "/neighbors")
blocks:
# main-write:
# params:
# ratio: 1
# ops:
# op1:
# upsert: "circles"
# namespace: "example_namespace"
# upsert_vectors:
# - id: "{id}"
# values: "{vector_value}"
# metadata:
# state: "{state}"
main-read:
params:
ratio: 1
verify:
ops:
op1:
query: "circles"
vector: "{vector_value}"
ops1:
query: "glove25"
namespace: "example_namespace"
top_k: 1
vector: "{vector}"
top_k: 100
include_values: true
include_metadata: true
#result should be type QueryResponse
include_metadata: false
verifier-imports:
- "io.nosqlbench.api.engine.metrics.ActivityMetrics"
- "io.nosqlbench.engine.extensions.vectormath.PineconeIntersections"
- "io.nosqlbench.engine.extensions.vectormath.PineconeVectorMath"
verifier-init: |
recallHisto = ActivityMetrics.histogram(_parsed_op,"recall-histo",4);
verifier: |
result.getMatchesList().get(0).getValuesList()=={vector_value}
found_string_ids=PineconeVectorMath.idsToStringArray(result);
found_int_ids=PineconeVectorMath.stringArrayAsIntArray(found_string_ids);
print(found_int_ids)
print({validation_set})
recall = PineconeVectorMath.computeRecall(found_int_ids, {validation_set})
print(recall)
recallHisto.update((long)(recall*1000000.0));
return true;

View File

@ -215,6 +215,7 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
final String dockerMetricsAt = globalOptions.wantsDockerMetricsAt();
String reportGraphiteTo = globalOptions.wantsReportGraphiteTo();
String annotatorsConfig = globalOptions.getAnnotatorsConfig();
String promPushConfig = globalOptions.getPromPushConfig();
final String reportPromPushTo = globalOptions.wantsReportPromPushTo();
@ -413,7 +414,7 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
final MetricReporters reporters = MetricReporters.getInstance();
reporters.addRegistry("workloads", ActivityMetrics.getMetricRegistry());
if (null != reportPromPushTo) reporters.addPromPush(reportPromPushTo, options.wantsMetricsPrefix());
if (null != reportPromPushTo) reporters.addPromPush(reportPromPushTo, options.wantsMetricsPrefix(), promPushConfig);
if (null != reportGraphiteTo) reporters.addGraphite(reportGraphiteTo, options.wantsMetricsPrefix());
if (null != options.wantsReportCsvTo())
reporters.addCSVReporter(options.wantsReportCsvTo(), options.wantsMetricsPrefix());

View File

@ -53,6 +53,7 @@ public class NBCLIOptions {
private static final String METRICS_PREFIX = "--metrics-prefix";
private static final String ANNOTATE_EVENTS = "--annotate";
private static final String ANNOTATORS_CONFIG = "--annotators";
private static final String PROMPUSH_CONFIG = "--prompush";
// Enabled if the TERM env var is provided
private static final String ANSI = "--ansi";
@ -184,6 +185,7 @@ public class NBCLIOptions {
private String[] annotateEvents = {"ALL"};
private String dockerMetricsHost;
private String annotatorsConfig = "";
private String promPushConfig = "";
private String statedirs = NBStatePath.NB_STATEDIR_PATHS;
private Path statepath;
private final String hdrForChartFileName = NBCLIOptions.DEFAULT_CHART_HDR_LOG_NAME;
@ -208,6 +210,10 @@ public class NBCLIOptions {
return this.annotatorsConfig;
}
public String getPromPushConfig() {
return this.promPushConfig;
}
public NBLabels getLabelMap() {
return this.labels;
}
@ -376,6 +382,10 @@ public class NBCLIOptions {
arglist.removeFirst();
this.reportPromPushTo = arglist.removeFirst();
break;
case NBCLIOptions.PROMPUSH_CONFIG:
arglist.removeFirst();
promPushConfig = this.readWordOrThrow(arglist, "prompush config");
break;
case NBCLIOptions.GRAPHITE_LOG_LEVEL:
arglist.removeFirst();
this.graphitelogLevel = arglist.removeFirst();

View File

@ -115,7 +115,7 @@ public class MetricReporters implements Shutdownable {
return this;
}
public MetricReporters addPromPush(final String reportPromPushTo, final String prefix) {
public MetricReporters addPromPush(final String reportPromPushTo, final String prefix, final String config) {
logger.debug(() -> "Adding prompush reporter to " + reportPromPushTo + " with prefix label to " + prefix);
@ -131,7 +131,8 @@ public class MetricReporters implements Shutdownable {
"prompush",
MetricFilter.ALL,
TimeUnit.SECONDS,
TimeUnit.NANOSECONDS
TimeUnit.NANOSECONDS,
config
);
scheduledReporters.add(promPushReporter);
}

View File

@ -291,7 +291,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.519</version>
<version>1.12.523</version>
</dependency>
<dependency>
<groupId>com.elega9t</groupId>

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.api.engine.metrics.reporters;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Supplier;
public class PromPushKeyFileReader implements Supplier<String> {
private final static Logger logger = LogManager.getLogger("METRICS" );
private final Path keyfilePath;
public PromPushKeyFileReader(Path path) {
this.keyfilePath = path;
}
public PromPushKeyFileReader(String sourcePath) {
this.keyfilePath = Path.of(sourcePath);
}
@Override
public String get() {
if (!Files.exists(keyfilePath)) {
logger.warn("apikeyfile does not exist at '" + keyfilePath);
return null;
} else {
try {
String apikey = Files.readString(keyfilePath, StandardCharsets.UTF_8);
apikey = apikey.trim();
return apikey;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -16,6 +16,9 @@
package io.nosqlbench.api.engine.metrics.reporters;
import io.nosqlbench.api.config.params.ParamsParser;
import io.nosqlbench.api.config.standard.*;
import com.codahale.metrics.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -28,19 +31,25 @@ import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
public class PromPushReporter extends ScheduledReporter {
public class PromPushReporter extends ScheduledReporter implements NBConfigurable {
private static final Logger logger = LogManager.getLogger(PromPushReporter.class);
private HttpClient client;
private final URI uri;
private String bearerToken;
private boolean needsAuth;
public PromPushReporter(
final String targetUriSpec,
@ -48,12 +57,56 @@ public class PromPushReporter extends ScheduledReporter {
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit
TimeUnit durationUnit,
final String config
) {
super(registry, name, filter, rateUnit, durationUnit);
uri = URI.create(targetUriSpec);
needsAuth = false;
ConfigLoader loader = new ConfigLoader();
List<Map> configs = loader.load(config, Map.class);
NBConfigModel cm = this.getConfigModel();
if (configs != null) {
logger.info("PromPushReporter process configuration: %s", config);
for (Map cmap : configs) {
NBConfiguration cfg = cm.apply(cmap);
this.applyConfig(cfg);
}
} else {
logger.info("PromPushReporter default configuration");
HashMap<String,String> junk = new HashMap<>(Map.of());
NBConfiguration cfg = cm.apply(junk);
this.applyConfig(cfg);
}
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(this.getClass())
.add(Param.defaultTo("apikeyfile", "$NBSTATEDIR/prompush/prompush_apikey")
.setDescription("The file that contains the api key, supersedes apikey"))
.add(Param.optional("apikey", String.class)
.setDescription("The api key to use"))
.asReadOnly();
}
@Override
public void applyConfig(NBConfiguration cfg) {
Path keyfilePath = null;
Optional<String> optionalApikeyfile = cfg.getEnvOptional("apikeyfile");
Optional<String> optionalApikey = cfg.getOptional("apikey");
bearerToken = null;
if (optionalApikeyfile.isPresent()) {
keyfilePath = optionalApikeyfile.map(Path::of).orElseThrow();
logger.info("Reading Bearer Token from %s", keyfilePath);
PromPushKeyFileReader keyfile = new PromPushKeyFileReader(keyfilePath);
bearerToken = "Bearer " + keyfile.get();
} else if (optionalApikey.isPresent()) {
bearerToken = "Bearer " + optionalApikey.get();
}
needsAuth = (null != bearerToken);
}
@Override
public synchronized void report(
SortedMap<String, Gauge> gauges,
@ -88,7 +141,11 @@ public class PromPushReporter extends ScheduledReporter {
while (0 < remainingRetries) {
remainingRetries--;
final HttpClient client = getCachedClient();
final HttpRequest request = HttpRequest.newBuilder().uri(uri).POST(BodyPublishers.ofString(exposition)).build();
final HttpRequest.Builder rb = HttpRequest.newBuilder().uri(uri);
if ( needsAuth ) {
rb.setHeader("Authorization", bearerToken );
}
final HttpRequest request = rb.POST(BodyPublishers.ofString(exposition)).build();
final BodyHandler<String> handler = HttpResponse.BodyHandlers.ofString();
HttpResponse<String> response = null;
try {