refinements for csv logging

This commit is contained in:
Jonathan Shook
2023-10-16 22:09:32 -05:00
parent fe1d3193a4
commit e44b5a21b7
15 changed files with 279 additions and 98 deletions

View File

@@ -24,7 +24,6 @@ import org.apache.logging.log4j.LogManager;
*/
public class PeriodicRunnable<T extends Runnable> implements Runnable, AutoCloseable {
private static final Logger logger = LogManager.getLogger(PeriodicRunnable.class);
private final long intervalMillis;
private final T action;
private Thread thread;

View File

@@ -42,9 +42,9 @@ public class ConsoleReporter extends PeriodicTaskComponent {
private final long rateFactor;
private final String durationUnit = TimeUnit.NANOSECONDS.toString().toLowerCase(Locale.US);
private final long durationFactor = TimeUnit.NANOSECONDS.toNanos(1);
public ConsoleReporter(NBComponent node, NBLabels extraLabels, int seconds, boolean oneLastTime,
public ConsoleReporter(NBComponent node, NBLabels extraLabels, long millis, boolean oneLastTime,
PrintStream output, Set<MetricAttribute> disabledMetricAttributes) {
super(node, extraLabels, seconds, oneLastTime);
super(node, extraLabels, millis, oneLastTime);
this.output = output;
this.dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT,
DateFormat.MEDIUM,

View File

@@ -18,7 +18,8 @@
package io.nosqlbench.api.engine.metrics.reporters;
import com.codahale.metrics.*;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
import io.nosqlbench.api.engine.metrics.instruments.*;
import io.nosqlbench.api.labels.NBLabelUtils;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.PeriodicTaskComponent;
@@ -26,9 +27,14 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -52,10 +58,11 @@ public class CsvReporter extends PeriodicTaskComponent {
private final long durationFactor;
private final long rateFactor;
private final NBComponent component;
private Map<Path, PrintWriter> outstreams = new HashMap<>();
public CsvReporter(NBComponent node, Path reportTo, int interval, MetricInstanceFilter filter,
public CsvReporter(NBComponent node, Path reportTo, long intervalMs, MetricInstanceFilter filter,
NBLabels extraLabels) {
super(node, extraLabels, interval, false);
super(node, extraLabels, intervalMs, false);
this.component = node;
this.reportTo = reportTo;
this.filter = filter;
@@ -68,30 +75,57 @@ public class CsvReporter extends PeriodicTaskComponent {
this.timerHeader = String.join(separator, "count", "max", "mean", "min", "stddev", "p50", "p75", "p95", "p98", "p99", "p999", "mean_rate", "m1_rate", "m5_rate", "m15_rate", "rate_unit", "duration_unit");
this.meterHeader = String.join(separator, "count", "mean_rate", "m1_rate", "m5_rate", "m15_rate", "rate_unit");
this.histogramHeader = String.join(separator, "count", "max", "mean", "min", "stddev", "p50", "p75", "p95", "p98", "p99", "p999");
if (Files.exists(reportTo) && !Files.isDirectory(reportTo)) {
throw new RuntimeException(reportTo.toString() + " already exists and is not a directory.");
}
if (!Files.exists(reportTo)) {
try {
Files.createDirectories(reportTo, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwx---")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public CsvReporter(NBComponent node, Path reportTo, int interval, MetricInstanceFilter filter) {
this(node, reportTo, interval, filter, null);
public CsvReporter(NBComponent node, Path reportTo, long intervalMs, MetricInstanceFilter filter) {
this(node, reportTo, intervalMs, filter, null);
}
public void start() {
@Override
public void task() {
List<NBMetric> metrics = component.find().metrics();
final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());
NBLabels commonLabels = NBLabelUtils.commonLabels(metrics);
logger.info("Factoring out common labels for CSV metrics logging: " + commonLabels.linearizeAsMetrics());
for (NBMetric metric : metrics) {
if (metric instanceof Gauge<?>) {
reportGauge(timestamp, metric.getLabels().linearizeAsMetrics(), (Gauge<?>) metric);
} else if (metric instanceof Counter) {
reportCounter(timestamp, metric.getLabels().linearizeAsMetrics(), (Counter) metric);
} else if (metric instanceof Histogram) {
reportHistogram(timestamp, metric.getLabels().linearizeAsMetrics(), (Histogram) metric);
} else if (metric instanceof Meter) {
reportMeter(timestamp, metric.getLabels().linearizeAsMetrics(), (Meter) metric);
} else if (metric instanceof Timer) {
reportTimer(timestamp, metric.getLabels().linearizeAsMetrics(), (Timer) metric);
String name = metric.getLabels().difference(commonLabels).linearize_bare("scenario","activity","name");
// metric.getLabels().difference(commonLabels);
switch (metric) {
case NBMetricGauge gauge:
reportGauge(timestamp, name, gauge);
break;
case NBMetricCounter counter:
reportCounter(timestamp, name, counter);
break;
case NBMetricHistogram histogram:
reportHistogram(timestamp, name, histogram);
break;
case NBMetricTimer timer:
reportTimer(timestamp, name, timer);
break;
case NBMetricMeter meter:
reportMeter(timestamp, name, meter);
break;
default:
throw new RuntimeException("Unrecognized metric type to report '" + metric.getClass().getSimpleName() + "'");
}
}
}
protected double convertDuration(double duration) {
return duration / durationFactor;
}
@@ -168,35 +202,35 @@ public class CsvReporter extends PeriodicTaskComponent {
}
private void report(long timestamp, String name, String header, String line, Object... values) {
try {
final File file = new File(reportTo + ".csv");
final boolean fileAlreadyExists = file.exists();
if (fileAlreadyExists || file.createNewFile()) {
try (PrintWriter out = new PrintWriter(new OutputStreamWriter(
new FileOutputStream(file, true), UTF_8))) {
if (!fileAlreadyExists) {
out.println("t" + separator + header);
}
out.printf(locale, String.format(locale, "%d" + separator + "%s%n", timestamp, line), values);
}
}
} catch (IOException e) {
logger.warn("Error writing to {}", name, e);
}
Path pathname = reportTo.resolve(Path.of(name + ".csv")).normalize();
PrintWriter out = outstreams.computeIfAbsent(pathname, p -> createWriter(p, "t" + separator + header));
out.printf(locale, String.format(locale, "%d" + separator + "%s%n", timestamp, line), values);
out.flush();
}
protected String sanitize(String fileName) {
//TODO: sanitize file name
return fileName;
private PrintWriter createWriter(Path path, String firstline) {
try {
boolean addHeader = !Files.exists(path);
if (!Files.exists(path)) {
addHeader = true;
Files.createFile(path, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxr--")));
Files.writeString(path, firstline);
}
BufferedWriter buf = Files.newBufferedWriter(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
PrintWriter out = new PrintWriter(buf);
if (addHeader) {
out.println(firstline);
}
return out;
} catch (IOException e) {
logger.warn("Error writing to {}", path, e);
throw new RuntimeException(e);
}
}
public void teardown() {
super.teardown();
}
@Override
public void task() {
this.start();
}
}

View File

@@ -49,9 +49,9 @@ public class Log4JMetricsReporter extends PeriodicTaskComponent {
final Marker marker,
final MetricFilter filter,
final NBLabels extraLabels,
final int seconds,
final long millis,
final boolean oneLastTime) {
super(component, extraLabels, seconds, oneLastTime);
super(component, extraLabels, millis, oneLastTime);
this.loggerProxy = loggerProxy;
this.marker = marker;
}

View File

@@ -49,11 +49,11 @@ public class PromPushReporterComponent extends PeriodicTaskComponent implements
public PromPushReporterComponent(
final String targetUriSpec,
final String config,
int seconds,
long millis,
NBComponent component,
NBLabels labels
) {
super(component, labels, seconds, true);
super(component, labels, millis, true);
uri = URI.create(targetUriSpec);
needsAuth = false;

View File

@@ -16,8 +16,6 @@
package io.nosqlbench.api.labels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.util.*;
@@ -92,6 +90,31 @@ public class MapLabels implements NBLabels {
return sb.toString();
}
@Override
public String linearize_bare(String... barewords) {
StringBuilder sb = new StringBuilder();
Set<String> keyset = new HashSet<>(labels.keySet());
for (String bareword : barewords) {
if (keyset.contains(bareword)) {
keyset.remove(bareword);
sb.append(labels.get(bareword)).append("__");
}
}
if (!sb.isEmpty()) {
sb.setLength(sb.length()-"__".length());
}
List<String> keys = new ArrayList<>(keyset);
if (!keys.isEmpty()) {
Collections.sort(keys);
for (String key : keys) {
sb.append("_").append(key).append("_").append(labels.get(key)).append("__");
}
sb.setLength(sb.length()-"__".length());
}
return sb.toString();
}
@Override
public String linearize(String bareName, String... included) {
final StringBuilder sb = new StringBuilder();
@@ -105,7 +128,7 @@ public class MapLabels implements NBLabels {
if (null == rawName) throw new RuntimeException("Unable to get value for key '" + bareName + '\'');
sb.append(rawName);
}
if (0 < includedNames.size()) {
if (1 < includedNames.size()) {
sb.append('{');
for (final String includedName : includedNames) {
if (includedName.equals(bareName)) continue;
@@ -116,9 +139,9 @@ public class MapLabels implements NBLabels {
.append(includedValue)
.append('"')
.append(',');
sb.setLength(sb.length()-",".length());
sb.append('}');
}
sb.setLength(sb.length()-",".length());
sb.append('}');
}
return sb.toString();
@@ -141,6 +164,23 @@ public class MapLabels implements NBLabels {
}
@Override
public String linearizeAsKvString() {
if (labels.isEmpty()) {
return "EMPTY";
}
StringBuilder sb = new StringBuilder("");
ArrayList<String> keys = new ArrayList<>(this.labels.keySet());
Collections.sort(keys);
for (String key : keys) {
sb.append(key).append("=").append(labels.get(key)).append(",");
}
sb.setLength(sb.length()-",".length());
return sb.toString();
}
public static String sanitize(String word) {
String sanitized = word;
sanitized = sanitized.replaceAll("\\.", "__");
@@ -256,4 +296,45 @@ public class MapLabels implements NBLabels {
public int hashCode() {
return labels != null ? labels.hashCode() : 0;
}
/**
* Take the intersection of the two label sets, considering both key
* and value for each label entry. If both have the same label name
* but different values for it, then that label is not considered
* common and it is not retained in the intersection.
* @param otherLabels The label set to intersect
*/
@Override
public NBLabels intersection(NBLabels otherLabels) {
Map<String, String> other = otherLabels.asMap();
Map<String,String> common = new LinkedHashMap<>();
asMap().forEach((k,v) -> {
if (other.containsKey(k) && other.get(k).equals(v)) {
common.put(k,v);
}
});
return NBLabels.forMap(common);
}
/**
* Subtract all matching labels from the other label set from this one,
* considering label names and values. If the other label set contains
* the same name but a different value, then it is not considered a
* match and thus not removed from the labels of this element.
* @param otherLabels Labels to remove, where key and value matches
* @return The same, or a smaller set of labels for this element
*/
@Override
public NBLabels difference(NBLabels otherLabels) {
Map<String, String> other = otherLabels.asMap();
NBLabels difference = NBLabels.forKV();
for (String key : labels.keySet()) {
if (!other.containsKey(key) || !other.get(key).equals(labels.get(key))) {
difference = difference.and(key,labels.get(key));
}
}
return difference;
}
}

View File

@@ -0,0 +1,41 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.api.labels;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class NBLabelUtils {
public static NBLabels common(List<NBLabels> labelsets) {
if (labelsets.isEmpty()) {
return NBLabels.forKV();
}
NBLabels common = labelsets.getFirst();
for (NBLabels labelset : labelsets) {
common = common.intersection(labelset);
}
return common;
}
public static <T extends NBLabeledElement> NBLabels commonLabels(List<T> elements) {
return common(elements.stream().map(NBLabeledElement::getLabels).collect(Collectors.toList()));
}
}

View File

@@ -67,6 +67,8 @@ public interface NBLabels {
return this.linearizeValues('.', included);
}
String linearize_bare(String... barewords);
/**
* Render a string representation of the label set according to the prometheus exposition naming format.
* This means that a label set which includes the JSON data:
@@ -135,6 +137,10 @@ public interface NBLabels {
*/
NBLabels modifyValue(String labelName, Function<String,String> transform);
String linearizeAsMetrics();
String linearizeAsKvString();
/**
* Create a new NBLabels value with the additional keys and values appended.
*
@@ -175,6 +181,9 @@ public interface NBLabels {
* like <PRE>{@code {__name__="metric_family_name",k="20"}}</PRE>
* @return a String
*/
String linearizeAsMetrics();
NBLabels intersection(NBLabels labelset);
NBLabels difference(NBLabels otherLabels);
}

View File

@@ -32,8 +32,8 @@ import java.util.List;
public class AttachedMetricsSummaryReporter extends PeriodicTaskComponent {
private final static Logger logger = LogManager.getLogger(AttachedMetricsPushReporter.class);
public AttachedMetricsSummaryReporter(NBComponent node, NBLabels extraLabels, int seconds) {
super(node, extraLabels, seconds, true);
public AttachedMetricsSummaryReporter(NBComponent node, NBLabels extraLabels, long millis) {
super(node, extraLabels, millis, true);
}
public void task() {

View File

@@ -119,10 +119,10 @@ public class NBCreators {
return histogram;
}
public AttachedMetricsSummaryReporter summaryReporter(int seconds, String... labelspecs) {
public AttachedMetricsSummaryReporter summaryReporter(long millis, String... labelspecs) {
logger.debug("attaching summary reporter to " + base.description());
NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
AttachedMetricsSummaryReporter reporter = new AttachedMetricsSummaryReporter(base, extraLabels, seconds);
AttachedMetricsSummaryReporter reporter = new AttachedMetricsSummaryReporter(base, extraLabels, millis);
return reporter;
}
// public AttachedMetricCsvReporter csvReporter(int seconds, String dirpath, String... labelspecs) {
@@ -131,9 +131,9 @@ public class NBCreators {
// AttachedMetricCsvReporter reporter = new AttachedMetricCsvReporter(base, extraLabels, Path.of(dirpath), seconds);
// return reporter;
// }
public PromPushReporterComponent pushReporter(String targetUri, int seconds, String config, String... labelspecs) {
public PromPushReporterComponent pushReporter(String targetUri, long millis, String config, String... labelspecs) {
NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
PromPushReporterComponent reporter = new PromPushReporterComponent(targetUri, config, seconds, base,extraLabels);
PromPushReporterComponent reporter = new PromPushReporterComponent(targetUri, config, millis, base,extraLabels);
return reporter;
}
@@ -169,7 +169,7 @@ public class NBCreators {
private MetricFilter filter= new MetricInstanceFilter();
private boolean oneLastTime = false;
private NBLabels labels;
private int interval = 1;
private long millis = 1000;
public Log4jReporterBuilder(NBComponent component) {
this.component = component;
@@ -179,7 +179,7 @@ public class NBCreators {
return this;
}
public Log4jReporterBuilder interval(final int interval) {
this.interval = interval;
this.millis = interval;
return this;
}
public Log4jReporterBuilder outputTo(final Logger logger) {
@@ -210,7 +210,7 @@ public class NBCreators {
case ERROR -> new ErrorLoggerProxy(this.logger);
default -> new DebugLoggerProxy(this.logger);
};
return new Log4JMetricsReporter(this.component, loggerProxy, this.marker, this.filter, this.labels, this.interval, this.oneLastTime);
return new Log4JMetricsReporter(this.component, loggerProxy, this.marker, this.filter, this.labels, this.millis, this.oneLastTime);
}
}
/* private class to allow logger configuration */
@@ -316,7 +316,7 @@ public class NBCreators {
private final NBComponent component;
private final PrintStream output;
private NBLabels labels = null;
private int interval = 1;
private long interval = 1000;
private boolean oneLastTime = false;
private Set<MetricAttribute> disabledMetricAttributes = Set.of();

View File

@@ -28,7 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class PeriodicTaskComponent extends NBBaseComponent implements Runnable {
private static final Logger logger = LogManager.getLogger(PeriodicTaskComponent.class);
private final int intervalSeconds;
private final long intervalmillis;
private final Lock lock = new ReentrantLock();
private final Condition shutdownSignal = lock.newCondition();
private final boolean oneLastTime;
@@ -39,23 +39,24 @@ public abstract class PeriodicTaskComponent extends NBBaseComponent implements R
public PeriodicTaskComponent(
NBComponent node,
NBLabels extraLabels,
int seconds,
long millis,
boolean oneLastTime
) {
super(node, extraLabels);
this.intervalSeconds = seconds;
this.intervalmillis = millis;
thread = Thread.ofVirtual().start(this);
this.oneLastTime=oneLastTime;
this.oneLastTime = oneLastTime;
}
protected abstract void task();
@Override
public void run() {
long now = System.currentTimeMillis();
long reportAt = now + intervalSeconds * 1000L;
long reportAt = now + intervalmillis;
long waitfor = reportAt - now;
while (true) {
while (running) {
while (running && waitfor > 0) {
boolean signalReceived = false;
try {
@@ -77,12 +78,14 @@ public abstract class PeriodicTaskComponent extends NBBaseComponent implements R
task();
} catch (Exception e) {
logger.error(e);
throw new RuntimeException(e);
} finally {
reportAt = reportAt + (intervalSeconds * 1000L);
reportAt = reportAt + (intervalmillis);
now = System.currentTimeMillis();
waitfor = reportAt - now;
}
}
logger.info("shutting down periodic runnable component: " + description());
}
public void teardown() {

View File

@@ -52,7 +52,7 @@ class AttachedMetricsSummaryReporterTest {
scope.add(root);
TestComponent l1 = new TestComponent(root, "l1", "l1");
NBMetricCounter counter = l1.create().counter("mycounter");
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(1);
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(1000);
NBFunctionGauge g1 = root.create().gauge("rootgauge", () -> 42d);
NBFunctionGauge g2 = l1.create().gauge("leafgauge", () -> 48d);
@@ -79,7 +79,7 @@ class AttachedMetricsSummaryReporterTest {
TestComponent root = new TestComponent("root", "root");
TestComponent l1 = new TestComponent(root, "l1", "l1");
NBMetricCounter counter = l1.create().counter("mycounter");
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(5);
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(5000);
NBFunctionGauge g1 = root.create().gauge("rootgauge", () -> 42d);
NBFunctionGauge g2 = l1.create().gauge("leafgauge", () -> 48d);