Merge branch 'main' into my-astra-schema-examples

This commit is contained in:
Dave Fisher
2024-07-23 12:47:59 -07:00
committed by GitHub
33 changed files with 714 additions and 142 deletions

View File

@@ -53,7 +53,7 @@ public class FieldDestructuringMapper implements Function<Map<String, Object>, M
return stringObjectMap;
}
} else {
throw new RuntimeException("During op mapping, can't parse something that is not a CharSequence: '" + fieldname + "' (type is " + o.getClass().getCanonicalName() + ")");
throw new RuntimeException("During op mapping, can't parse something that is not a CharSequence: '" + fieldname + "' (type is " + o.getClass().getCanonicalName() + ") (value is:" + o.toString());
}
} else {
return stringObjectMap;

View File

@@ -99,12 +99,37 @@ public class NBCreators {
}
public WindowSummaryGauge windowSummaryGauge(
String name,
int window,
List<String> statspecs,
MetricCategory category,
String description
) {
List<WindowSummaryGauge.Stat> stats =
statspecs.stream().map(WindowSummaryGauge.Stat::valueOf).toList();
DoubleSummaryStatistics reservoir = new DoubleSummaryStatistics();
WindowSummaryGauge anyGauge = null;
for (WindowSummaryGauge.Stat stat : stats) {
anyGauge = new WindowSummaryGauge(
window,
base.getLabels().and(NBLabels.forKV("name", name+"_w"+window, "stat", stat)),
stat,
description,
category
);
base.addComponentMetric(anyGauge, category, description);
}
return anyGauge;
}
public DoubleSummaryGauge summaryGauge(String name, List<String> statspecs, MetricCategory category, String description) {
List<DoubleSummaryGauge.Stat> stats = statspecs.stream().map(DoubleSummaryGauge.Stat::valueOf).toList();
DoubleSummaryStatistics reservoir = new DoubleSummaryStatistics();
DoubleSummaryGauge anyGauge = null;
for (DoubleSummaryGauge.Stat stat : stats) {
anyGauge = new DoubleSummaryGauge(base.getLabels().and(NBLabels.forKV("name",name,"stat", stat)), stat, reservoir, description, category);
anyGauge = new DoubleSummaryGauge(base.getLabels().and(NBLabels.forKV("name", name, "stat", stat)), stat, reservoir, description, category);
base.addComponentMetric(anyGauge, category, description);
}
return anyGauge;
@@ -113,6 +138,7 @@ public class NBCreators {
public NBMetricHistogram histogram(String metricFamilyName, MetricCategory category, String description) {
return histogram(metricFamilyName,4, category, description);
}
public NBMetricHistogram histogram(String metricFamilyName, int hdrdigits, MetricCategory category, String description) {
NBLabels labels = base.getLabels().and("name", metricFamilyName);
NBMetricHistogram histogram = new NBMetricHistogram(labels, new DeltaHdrHistogramReservoir(labels, hdrdigits), description, category);
@@ -120,7 +146,7 @@ public class NBCreators {
return histogram;
}
// public AttachedMetricsSummaryReporter summaryReporter(long millis, String... labelspecs) {
// public AttachedMetricsSummaryReporter summaryReporter(long millis, String... labelspecs) {
// logger.debug("attaching summary reporter to " + base.description());
// NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
// AttachedMetricsSummaryReporter reporter = new AttachedMetricsSummaryReporter(base, extraLabels, millis);
@@ -198,7 +224,7 @@ public class NBCreators {
private Logger logger = LogManager.getLogger(Log4JMetricsReporter.class);
private Log4JMetricsReporter.LoggingLevel loggingLevel = Log4JMetricsReporter.LoggingLevel.INFO;
private Marker marker;
private MetricFilter filter= new MetricInstanceFilter();
private MetricFilter filter = new MetricInstanceFilter();
private boolean oneLastTime = false;
private NBLabels labels;
private long millis = 1000;
@@ -206,34 +232,42 @@ public class NBCreators {
public Log4jReporterBuilder(NBComponent component) {
this.component = component;
}
public Log4jReporterBuilder oneLastTime(final boolean oneLastTime) {
this.oneLastTime = oneLastTime;
return this;
}
public Log4jReporterBuilder interval(final int interval) {
this.millis = interval;
return this;
}
public Log4jReporterBuilder outputTo(final Logger logger) {
this.logger = logger;
return this;
}
public Log4jReporterBuilder markWith(final Marker marker) {
this.marker = marker;
return this;
}
public Log4jReporterBuilder labels(final NBLabels labels) {
this.labels = labels;
return this;
}
public Log4jReporterBuilder filter(final MetricFilter filter) {
this.filter = filter;
return this;
}
public Log4jReporterBuilder withLoggingLevel(final Log4JMetricsReporter.LoggingLevel loggingLevel) {
this.loggingLevel = loggingLevel;
return this;
}
public Log4JMetricsReporter build() {
final LoggerProxy loggerProxy = switch (this.loggingLevel) {
case TRACE -> new TraceLoggerProxy(this.logger);
@@ -245,6 +279,7 @@ public class NBCreators {
return new Log4JMetricsReporter(this.component, loggerProxy, this.marker, this.filter, this.labels, this.millis, this.oneLastTime);
}
}
/* private class to allow logger configuration */
public abstract static class LoggerProxy {
protected final Logger logger;
@@ -356,22 +391,27 @@ public class NBCreators {
this.component = component;
this.output = output;
}
public ConsoleReporterBuilder labels(NBLabels labels) {
this.labels = labels;
return this;
}
public ConsoleReporterBuilder interval(int interval) {
this.interval = interval;
return this;
}
public ConsoleReporterBuilder oneLastTime(boolean oneLastTime) {
this.oneLastTime = oneLastTime;
return this;
}
public ConsoleReporterBuilder disabledMetricAttributes(Set<MetricAttribute> disabledMetricAttributes) {
this.disabledMetricAttributes = disabledMetricAttributes;
return this;
}
public ConsoleReporter build() {
return new ConsoleReporter(component, labels, interval, oneLastTime, output, disabledMetricAttributes);
}
@@ -387,10 +427,12 @@ public class NBCreators {
this.component = component;
this.filename = filename;
}
public CsvOutputWriterBuilder headers(String... headers) {
this.headers = headers;
return this;
}
public CsvOutputPluginWriter build() {
return new CsvOutputPluginWriter(component, filename, headers);
}
@@ -406,26 +448,32 @@ public class NBCreators {
public CsvReporterBuilder(NBComponent component) {
this.component = component;
}
public CsvReporterBuilder labels(NBLabels labels) {
this.labels = labels;
return this;
}
public CsvReporterBuilder path(Path reportTo) {
this.reportTo = reportTo;
return this;
}
public CsvReporterBuilder path(String reportTo) {
this.reportTo = Path.of(reportTo);
return this;
}
public CsvReporterBuilder interval(int interval) {
this.interval = interval;
return this;
}
public CsvReporterBuilder filter(MetricInstanceFilter filter) {
this.filter = filter;
return this;
}
public CsvReporter build() {
return new CsvReporter(component, reportTo, interval, filter, labels);
}

View File

@@ -0,0 +1,97 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.engine.metrics;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.stats.StatBucket;
import java.util.DoubleSummaryStatistics;
import java.util.function.DoubleConsumer;
/**
* Create a discrete stat reservoir as a gauge.
*/
public class WindowSummaryGauge implements NBMetricGauge, DoubleConsumer {
private final NBLabels labels;
private final Stat stat;
private final StatBucket stats;
private final String description;
private final MetricCategory[] categories;
private final int window;
@Override
public String typeName() {
return "gauge";
}
@Override
public String getDescription() {
return this.description;
}
@Override
public MetricCategory[] getCategories() {
return this.categories;
}
public enum Stat {
Min,
Max,
Average,
Count,
Sum
}
public WindowSummaryGauge(int window, NBLabels labels, Stat stat, String description,
MetricCategory... categories) {
this.labels = labels;
this.stat = stat;
this.description = description;
this.categories = categories;
this.window = window;
this.stats = new StatBucket(window);
}
public synchronized void accept(double value) {
stats.apply(value);
}
@Override
public synchronized Double getValue() {
return switch(stat) {
case Min -> stats.getMin();
case Max -> stats.getMax();
case Average -> stats.getAverage();
case Count -> (double) stats.getCount();
case Sum -> stats.getSum();
};
}
@Override
public NBLabels getLabels() {
return labels;
}
@Override
public String toString() {
return this.labels.toString()+":"+this.stats.toString();
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.engine.metrics.wrappers;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.metrics.WindowSummaryGauge;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.labels.NBLabeledElement;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.stats.StatBucket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WindowedRelevancyMeasures implements NBLabeledElement {
private final NBComponent parent;
private final NBLabels labels;
private final List<RelevancyFunction> functions = new ArrayList<>();
private final List<WindowSummaryGauge> gauges = new ArrayList<>();
private final int window;
private int offset = 0;
public WindowedRelevancyMeasures(NBComponent parent, int window) {
this(parent, NBLabels.forKV(),window);
}
public WindowedRelevancyMeasures(NBComponent parent, NBLabels labels, int window) {
this.parent = parent;
this.labels = labels;
this.window = window;
}
public WindowedRelevancyMeasures(NBComponent parent, Map<String, String> labels, int window) {
this(parent, NBLabels.forMap(labels), window);
}
@Override
public NBLabels getLabels() {
return parent.getLabels().and(labels);
}
public WindowedRelevancyMeasures addFunction(RelevancyFunction... f) {
for (RelevancyFunction function : f) {
this.functions.add(function);
function.prependLabels(this);
WindowSummaryGauge gauge = parent.create().windowSummaryGauge(
function.getUniqueName(),
window,
List.of("Average"),
MetricCategory.Accuracy,
WindowSummaryGauge.Stat.Average.toString()
);
this.gauges.add(gauge);
}
return this;
}
public void accept(int[] relevant, int[] actual) {
offset++;
if (offset >= window) {
offset = 0;
}
for (int i = 0; i < functions.size(); i++) {
double metricValue = functions.get(i).apply(relevant, actual);
gauges.get(i).accept(metricValue);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (WindowSummaryGauge gauge : gauges) {
sb.append(gauge.toString()).append("\n");
}
return sb.toString();
}
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.stats;
public class DoubleRing {
private final double[] dbuf;
private int count;
private int idx;
public DoubleRing(int size) {
this.dbuf = new double[size];
this.count = 0;
}
public DoubleRing(double[] samples) {
this.dbuf=samples;
this.count =samples.length;
}
public double push(double value) {
double ejected = (count == dbuf.length) ? dbuf[idx] : Double.NaN;
count += (count < dbuf.length) ? 1 : 0;
dbuf[idx] = value;
idx = (idx + 1) % dbuf.length;
return ejected;
}
public int size() {
return dbuf.length;
}
public int count() {
return count;
}
public double min() {
double min = Double.MAX_VALUE;
for (int i = 0; i < count; i++) {
min = Math.min(min,dbuf[i]);
}
return min;
}
public double max() {
double max = Double.MIN_VALUE;
for (int i = 0; i < count; i++) {
max = Math.max(max,dbuf[i]);
}
return max;
}
}

View File

@@ -0,0 +1,134 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.stats;
import java.util.Objects;
/**
* This is a relatively efficient statistics bucket which can maintain moving
* aggregates over a window of samples for count, mean, variance, stddev, sum.
* This is particularly useful when you know that each update to the data
* will likely be used in a query.
*/
public final class StatBucket {
DoubleRing ringbuf;
private double mean;
private double dSquared = 0.0d;
public StatBucket() {
this(10);
}
public StatBucket(int sampleWindow) {
this.ringbuf = new DoubleRing(sampleWindow);
}
public StatBucket(double[] samples) {
this.ringbuf = new DoubleRing(samples);
}
public StatBucket apply(double value) {
// System.out.println("stat->" + value + " bucket:" + toString());
double popped = ringbuf.push(value);
if (ringbuf.count() == 1) {
mean = value;
dSquared = 0.0d;
} else if (Double.isNaN(popped)) {
var newMean = mean + ((value - mean) / ringbuf.count());
var dSquaredIncrement = ((value - newMean) * (value - mean));
// If this value is too small to be interpreted as a double it gets converted to
// zero, which is not what we want. So we use the smallest possible double value
if (dSquaredIncrement == 0) dSquaredIncrement = Double.MIN_VALUE;
dSquared += dSquaredIncrement;
mean = newMean;
} else {
var meanIncrement = (value - popped) / ringbuf.count();
var newMean = mean + meanIncrement;
var dSquaredIncrement = ((value - popped) * (value - newMean + popped - mean));
// If this value is too small to be interpreted as a double it gets converted to
// zero, which is not what we want. So we use the smallest possible double value
if (dSquaredIncrement == 0) dSquaredIncrement = Double.MIN_VALUE;
var newDSquared = this.dSquared + dSquaredIncrement;
mean = newMean;
dSquared = newDSquared;
}
return this;
}
public double variance() {
double variance = dSquared / ringbuf.count();
return (variance < 0) ? Math.abs(variance) : variance;
}
public double stddev() {
return Math.sqrt(variance());
}
public int count() {
return ringbuf.count();
}
public double mean() {
return mean;
}
@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (StatBucket) obj;
return this.ringbuf.count() == that.ringbuf.count() &&
Double.doubleToLongBits(this.mean) == Double.doubleToLongBits(that.mean);
}
@Override
public int hashCode() {
return Objects.hash(ringbuf.count(), mean);
}
@Override
public String toString() {
return "StatBucket[" +
"count=" + ringbuf.count() + ", " +
"mean=" + mean + ", " +
"stddev=" + stddev() + ", " +
"variance=" + variance() + ']';
}
public boolean primed() {
return this.count()== ringbuf.size();
}
public double getMin() {
return ringbuf.min();
}
public double getMax() {
return ringbuf.max();
}
public double getAverage() {
return this.mean();
}
public double getCount() {
return count();
}
public double getSum() {
return this.mean() * this.count();
}
}

View File

@@ -0,0 +1,70 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.stats;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
class StatBucketTest {
@Test
public void testStreamingMean() {
var bucket = new StatBucket();
bucket.apply(5.0d);
assertThat(bucket.mean()).isCloseTo(5.0d, Offset.offset(0.001d));
bucket.apply(10.0d);
assertThat(bucket.mean()).isCloseTo(7.5d, Offset.offset(0.001d));
bucket.apply(15.0d);
assertThat(bucket.mean()).isCloseTo(10.0d, Offset.offset(0.001d));
bucket.apply(20.0d);
assertThat(bucket.mean()).isCloseTo(12.5d, Offset.offset(0.001d));
}
@Test
public void testStreamingComputations() {
double[] samples = new double[]{2, 4, 4, 4, 5, 5, 7, 9};
var bucket = new StatBucket(8);
for (int i = 0; i < samples.length * 10; i++) {
bucket.apply(samples[i % samples.length]);
if (i > 0 && (i % samples.length) == 0) {
assertThat(bucket.mean()).isCloseTo(5, Offset.offset(0.001d));
assertThat(bucket.stddev()).isCloseTo(2.0, Offset.offset(0.001d));
}
}
}
@Test
public void testErrorAccumulation1() {
var bucket = new StatBucket(11);
for (long base = 1; base <10000000000000000L ; base*=10) {
for (int i = 0; i< 10; i++) {
long value = base+i;
bucket.apply(value);
}
for (int i = 10; i < 20; i++) {
long value = base+i;
bucket.apply(value);
double streamingMean = bucket.mean();
assertThat(streamingMean).isCloseTo((double)(value-5), Offset.offset(0.03d));
}
}
}
}