incremental progress

This commit is contained in:
Jonathan Shook 2023-10-02 16:25:10 -05:00
parent b0e984c55f
commit b130caf154
27 changed files with 1146 additions and 11 deletions

View File

@ -179,6 +179,7 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
instrument = pop.takeStaticConfigOr("instrument", false); instrument = pop.takeStaticConfigOr("instrument", false);
if (this.instrument) { if (this.instrument) {
final int hdrDigits = pop.getStaticConfigOr("hdr_digits", 4).intValue(); final int hdrDigits = pop.getStaticConfigOr("hdr_digits", 4).intValue();
successTimer = ActivityMetrics.timer(pop, ActivityMetrics.sanitize("successfor_"+getOpName()), hdrDigits); successTimer = ActivityMetrics.timer(pop, ActivityMetrics.sanitize("successfor_"+getOpName()), hdrDigits);
errorTimer = ActivityMetrics.timer(pop, ActivityMetrics.sanitize("errorsfor_"+getOpName()), hdrDigits); errorTimer = ActivityMetrics.timer(pop, ActivityMetrics.sanitize("errorsfor_"+getOpName()), hdrDigits);
} }

View File

@ -0,0 +1,159 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
public class ComponentActivityInstrumentation implements ActivityInstrumentation {
private static final String STRICTMETRICNAMES = "strictmetricnames";
private static final String WAIT_TIME = "_waittime";
private static final String SERVICE_TIME = "_servicetime";
private static final String RESPONSE_TIME = "_responsetime";
private final Activity activity;
private final ActivityDef def;
private final ParameterMap params;
private final int hdrdigits;
private NBMetricTimer readInputTimer;
private NBMetricTimer stridesServiceTimer;
private NBMetricTimer stridesResponseTimer;
private NBMetricTimer cyclesServiceTimer;
private NBMetricTimer cyclesResponseTimer;
private NBMetricCounter pendingOpsCounter;
private NBMetricCounter opTrackerBlockedCounter;
private NBMetricTimer bindTimer;
private NBMetricTimer executeTimer;
private NBMetricTimer resultTimer;
private NBMetricTimer resultSuccessTimer;
private NBMetricHistogram triesHistogram;
private NBMetricTimer verifierTimer;
public ComponentActivityInstrumentation(final Activity activity) {
this.activity = activity;
def = activity.getActivityDef();
params = this.def.getParams();
hdrdigits = activity.getHdrDigits();
initMetrics();
}
private void initMetrics() {
readInputTimer=activity.create().timer("read_input",this.hdrdigits);
stridesServiceTimer=activity.create().timer("strides",this.hdrdigits);
if (null != activity.getStrideLimiter()) {
this.stridesResponseTimer = activity.create().timer(
"strides" + ComponentActivityInstrumentation.RESPONSE_TIME,
hdrdigits
);
}
this.cyclesServiceTimer = activity.create().timer(
"cycles"+ComponentActivityInstrumentation.SERVICE_TIME,
hdrdigits
);
if (null != activity.getCycleLimiter()) {
this.cyclesResponseTimer = activity.create().timer(
"cycles" + ComponentActivityInstrumentation.RESPONSE_TIME,
hdrdigits
);
}
this.pendingOpsCounter=activity.create().counter("pending_ops");
this.opTrackerBlockedCounter=activity.create().counter("optracker_blocked");
this.bindTimer = activity.create().timer("bind",hdrdigits);
this.executeTimer = activity.create().timer("execute",hdrdigits);
this.resultTimer = activity.create().timer("result",hdrdigits);
this.resultSuccessTimer = activity.create().timer("result_success",hdrdigits);
this.triesHistogram = activity.create().histogram("tries",hdrdigits);
this.verifierTimer = activity.create().timer("verifier",hdrdigits);
}
@Override
public Timer getOrCreateInputTimer() {
return readInputTimer;
}
@Override
public Timer getOrCreateStridesServiceTimer() {
return stridesServiceTimer;
}
@Override
public Timer getStridesResponseTimerOrNull() {
return stridesResponseTimer;
}
@Override
public Timer getOrCreateCyclesServiceTimer() {
return cyclesServiceTimer;
}
@Override
public Timer getCyclesResponseTimerOrNull() {
return cyclesResponseTimer;
}
@Override
public Counter getOrCreatePendingOpCounter() {
return pendingOpsCounter;
}
@Override
public Counter getOrCreateOpTrackerBlockedCounter() {
return opTrackerBlockedCounter;
}
@Override
public Timer getOrCreateBindTimer() {
return bindTimer;
}
@Override
public Timer getOrCreateExecuteTimer() {
return executeTimer;
}
@Override
public Timer getOrCreateResultTimer() {
return resultTimer;
}
@Override
public Timer getOrCreateResultSuccessTimer() {
return resultSuccessTimer;
}
@Override
public Histogram getOrCreateTriesHistogram() {
return triesHistogram;
}
@Override
public Timer getOrCreateVerifierTimer() {
return verifierTimer;
}
}

View File

@ -283,7 +283,8 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
@Override @Override
public synchronized ActivityInstrumentation getInstrumentation() { public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) { if (null == this.activityInstrumentation) {
activityInstrumentation = new CoreActivityInstrumentation(this); activityInstrumentation = new ComponentActivityInstrumentation(this);
// activityInstrumentation = new CoreActivityInstrumentation(this);
} }
return activityInstrumentation; return activityInstrumentation;
} }

View File

@ -446,6 +446,7 @@
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId> <artifactId>junit-jupiter</artifactId>
<version>5.9.2</version> <version>5.9.2</version>
<scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapters.api.util;
import io.nosqlbench.api.engine.util.Tagged; import io.nosqlbench.api.engine.util.Tagged;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import java.util.*; import java.util.*;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -110,6 +111,10 @@ public class TagFilter {
.filter(l -> this.matches(l.getLabels().asMap()).matched()) .filter(l -> this.matches(l.getLabels().asMap()).matched())
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public boolean matchesLabeled(NBComponent c) {
return this.matches(c.getLabels().asMap()).matched();
}
public <T extends Tagged> List<String> filterLog(List<T> tagged) { public <T extends Tagged> List<String> filterLog(List<T> tagged) {
return tagged.stream() return tagged.stream()
@ -118,6 +123,7 @@ public class TagFilter {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private enum Conjugate { private enum Conjugate {
any((i,j) -> (j>0)), any((i,j) -> (j>0)),
all((i,j) -> (i.intValue()==j.intValue())), all((i,j) -> (i.intValue()==j.intValue())),

View File

@ -46,4 +46,11 @@ public class NBFunctionGauge implements NBMetricGauge<Double> {
public NBLabels getLabels() { public NBLabels getLabels() {
return parent.getLabels().and(this.labels); return parent.getLabels().and(this.labels);
} }
@Override
public String toString() {
return description();
}
} }

View File

@ -20,4 +20,7 @@ import com.codahale.metrics.Metric;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.labels.NBLabeledElement;
public interface NBMetric extends Metric, NBLabeledElement { public interface NBMetric extends Metric, NBLabeledElement {
default String getHandle() {
return this.getLabels().linearizeAsMetrics();
}
} }

View File

@ -70,7 +70,6 @@ public class NBMetricTimer extends Timer implements DeltaSnapshotter, HdrDeltaHi
return timer; return timer;
} }
@Override @Override
public Histogram getNextHdrDeltaHistogram() { public Histogram getNextHdrDeltaHistogram() {
return deltaHdrHistogramReservoir.getNextHdrHistogram(); return deltaHdrHistogramReservoir.getNextHdrHistogram();
@ -89,6 +88,6 @@ public class NBMetricTimer extends Timer implements DeltaSnapshotter, HdrDeltaHi
@Override @Override
public String toString() { public String toString() {
return "NBTIMER:"+this.getLabels().toString(); return description();
} }
} }

View File

@ -20,6 +20,7 @@ import io.nosqlbench.api.config.params.ParamsParser;
import io.nosqlbench.api.config.standard.*; import io.nosqlbench.api.config.standard.*;
import com.codahale.metrics.*; import com.codahale.metrics.*;
import io.nosqlbench.components.NBComponent;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;

View File

@ -0,0 +1,241 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
import io.nosqlbench.api.engine.metrics.reporters.PromExpositionFormat;
import io.nosqlbench.api.engine.metrics.reporters.PromPushKeyFileReader;
import io.nosqlbench.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AttachedMetricsPushReporter extends NBBaseComponent implements NBConfigurable, Runnable {
private static final Logger logger = LogManager.getLogger(AttachedMetricsPushReporter.class);
private final int intervalSeconds;
private HttpClient client;
private final URI uri;
private String bearerToken;
private boolean needsAuth;
private Lock lock = new ReentrantLock(false);
private Condition shutdownSignal = lock.newCondition();
public AttachedMetricsPushReporter(
final String targetUriSpec,
NBComponent node,
int seconds,
NBLabels extraLabels
) {
super(node, extraLabels);
this.intervalSeconds = seconds;
uri = URI.create(targetUriSpec);
needsAuth = false;
String config = "";
ConfigLoader loader = new ConfigLoader();
List<Map> configs = loader.load(config, Map.class);
NBConfigModel cm = this.getConfigModel();
if (configs != null) {
logger.info("PromPushReporter process configuration: %s", config);
for (Map cmap : configs) {
NBConfiguration cfg = cm.apply(cmap);
this.applyConfig(cfg);
}
} else {
logger.info("PromPushReporter default configuration");
HashMap<String, String> junk = new HashMap<>(Map.of());
NBConfiguration cfg = cm.apply(junk);
this.applyConfig(cfg);
}
Thread.ofVirtual().start(this);
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(this.getClass())
.add(Param.defaultTo("apikeyfile", "$NBSTATEDIR/prompush/prompush_apikey")
.setDescription("The file that contains the api key, supersedes apikey"))
.add(Param.optional("apikey", String.class)
.setDescription("The api key to use"))
.asReadOnly();
}
@Override
public void applyConfig(NBConfiguration cfg) {
Path keyfilePath = null;
Optional<String> optionalApikeyfile = cfg.getEnvOptional("apikeyfile");
Optional<String> optionalApikey = cfg.getOptional("apikey");
bearerToken = null;
if (optionalApikeyfile.isPresent()) {
keyfilePath = optionalApikeyfile.map(Path::of).orElseThrow();
if (Files.isRegularFile(keyfilePath)) {
logger.info("Reading Bearer Token from %s", keyfilePath);
PromPushKeyFileReader keyfile = new PromPushKeyFileReader(keyfilePath);
bearerToken = keyfile.get();
}
} else if (optionalApikey.isPresent()) {
bearerToken = optionalApikey.get();
}
needsAuth = (null != bearerToken);
bearerToken = "Bearer " + bearerToken;
}
public synchronized void report() {
final Clock nowclock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
StringBuilder sb = new StringBuilder(1024 * 1024); // 1M pre-allocated to reduce heap churn
List<NBMetric> metrics = new ArrayList<>();
Iterator<NBComponent> allMetrics = NBComponentTraversal.traverseBreadth(getParent());
allMetrics.forEachRemaining(m -> metrics.addAll(m.findMetrics("")));
int total = 0;
for (NBMetric metric : metrics) {
sb = PromExpositionFormat.format(nowclock, sb, metric);
total++;
}
AttachedMetricsPushReporter.logger.debug("formatted {} metrics in prom expo format", total);
final String exposition = sb.toString();
logger.trace(() -> "prom exposition format:\n" + exposition);
final double backoffRatio = 1.5;
final double maxBackoffSeconds = 10;
double backOff = 1.0;
final int maxRetries = 5;
int remainingRetries = maxRetries;
final List<Exception> errors = new ArrayList<>();
boolean succeeded = false;
while (0 < remainingRetries) {
remainingRetries--;
final HttpClient client = getCachedClient();
final HttpRequest.Builder rb = HttpRequest.newBuilder().uri(uri);
if (needsAuth) {
rb.setHeader("Authorization", bearerToken);
}
final HttpRequest request = rb.POST(BodyPublishers.ofString(exposition)).build();
final BodyHandler<String> handler = HttpResponse.BodyHandlers.ofString();
HttpResponse<String> response = null;
try {
response = client.send(request, handler);
final int status = response.statusCode();
if ((200 > status) || (300 <= status)) {
final String errmsg = "status " + response.statusCode() + " while posting metrics to '" + this.uri + '\'';
throw new RuntimeException(errmsg);
}
AttachedMetricsPushReporter.logger.debug("posted {} metrics to prom push endpoint '{}'", total, this.uri);
succeeded = true;
break;
} catch (final Exception e) {
errors.add(e);
try {
Thread.sleep((int) backOff * 1000L);
} catch (final InterruptedException ignored) {
}
backOff = Math.min(maxBackoffSeconds, backOff * backoffRatio);
}
}
if (!succeeded) {
AttachedMetricsPushReporter.logger.error("Failed to send push prom metrics after {} tries. Errors follow:", maxRetries);
for (final Exception error : errors) AttachedMetricsPushReporter.logger.error(error);
}
}
private synchronized HttpClient getCachedClient() {
if (null == client) this.client = this.getNewClient();
return this.client;
}
private synchronized HttpClient getNewClient() {
this.client = HttpClient.newBuilder()
.followRedirects(Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(60))
.version(Version.HTTP_2)
.build();
return this.client;
}
@Override
public void run() {
long now = System.currentTimeMillis();
long reportAt = now + intervalSeconds * 1000L;
long waitfor = reportAt - now;
loop:
while (true) {
while (waitfor > 0) {
try {
if (shutdownSignal.await(waitfor, TimeUnit.MILLISECONDS)) {
logger.debug("shutting down " + this);
break loop;
}
now = System.currentTimeMillis();
waitfor = now - reportAt;
} catch (InterruptedException ignored) {
}
logger.info("reporting metrics via push");
try {
report();
} catch (Exception e) {
logger.error(e);
} finally {
reportAt = now;
now = System.currentTimeMillis();
waitfor = now - reportAt;
}
}
}
logger.info("reporter thread shutting down");
}
@Override
public void beforeDetach() {
this.shutdown();
}
private void shutdown() {
logger.debug("shutting down " + this);
lock.lock();
shutdownSignal.signal();
lock.unlock();
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
import io.nosqlbench.api.engine.metrics.reporters.PromExpositionFormat;
import io.nosqlbench.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class AttachedMetricsSummaryReporter extends PeriodicTaskComponent {
private final static Logger logger = LogManager.getLogger(AttachedMetricsPushReporter.class);
public AttachedMetricsSummaryReporter(NBComponent node, NBLabels extraLabels, int seconds) {
super(node, extraLabels, seconds, true);
}
public void task() {
final Clock nowclock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
StringBuilder sb = new StringBuilder(1024 * 1024); // 1M pre-allocated to reduce heap churn
List<NBMetric> metrics = new ArrayList<>();
Iterator<NBComponent> allMetrics = NBComponentTraversal.traverseBreadth(getParent());
allMetrics.forEachRemaining(m -> metrics.addAll(m.findMetrics("")));
int total = 0;
for (NBMetric metric : metrics) {
sb = PromExpositionFormat.format(nowclock, sb, metric);
total++;
}
AttachedMetricsSummaryReporter.logger.debug("formatted {} metrics in prom expo format", total);
final String exposition = sb.toString();
logger.info(() -> "prom exposition format:\n" + exposition);
}
}

View File

@ -103,4 +103,41 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
public void beforeDetach() { public void beforeDetach() {
logger.debug("before detach " + description()); logger.debug("before detach " + description());
} }
@Override
public final void close() throws RuntimeException {
try {
logger.debug("cleaning up");
ArrayList<NBComponent> children = new ArrayList<>(getChildren());
for (NBComponent child : children) {
child.close();
}
teardown();
} catch (Exception e) {
logger.error(e);
} finally {
logger.debug("detaching " + description());
if (parent!=null) {
parent.detachChild(this);
}
}
}
/**
* Override this method in your component implementations when you need to do something
* to close out your component.
*/
protected void teardown() {
logger.debug("tearing down " + description());
}
@Override
public NBBuilders create() {
return new NBBuilders(this);
}
@Override
public NBFinders find() {
return new NBFinders(this);
}
} }

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Supplier;
public class NBBuilders {
private final Logger logger = LogManager.getLogger(NBBuilders.class);
private final NBBaseComponent base;
public NBBuilders(NBBaseComponent base) {
this.base = base;
}
public NBMetricTimer timer(String metricFamilyName, int hdrdigits) {
NBLabels labels = base.getLabels().and("name", metricFamilyName);
NBMetricTimer timer = new NBMetricTimer(labels, new DeltaHdrHistogramReservoir(labels, hdrdigits));
base.addMetric(timer);
return timer;
}
public NBMetricCounter counter(String metricFamilyName) {
NBLabels labels = base.getLabels().and("name", metricFamilyName);
NBMetricCounter counter = new NBMetricCounter(labels);
base.addMetric(counter);
return counter;
}
public NBFunctionGauge gauge(String metricFamilyName, Supplier<Double> valueSource) {
NBFunctionGauge gauge = new NBFunctionGauge(base, valueSource, metricFamilyName);
base.addMetric(gauge);
return gauge;
}
public NBMetricHistogram histogram(String metricFamilyName, int hdrdigits) {
NBLabels labels = base.getLabels().and("name", metricFamilyName);
NBMetricHistogram histogram = new NBMetricHistogram(labels, new DeltaHdrHistogramReservoir(labels, hdrdigits));
base.addMetric(histogram);
return histogram;
}
public AttachedMetricsSummaryReporter summaryReporter(int seconds, String... labelspecs) {
logger.debug("attaching summary reporter to " + base.description());
NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
AttachedMetricsSummaryReporter reporter = new AttachedMetricsSummaryReporter(base, extraLabels, seconds);
return reporter;
}
public AttachedMetricsPushReporter pushReporter(String targetUri, int seconds,String... labelspecs) {
NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
AttachedMetricsPushReporter reporter = new AttachedMetricsPushReporter(targetUri, base, seconds, extraLabels);
return reporter;
}
}

View File

@ -31,9 +31,9 @@ import java.util.List;
* <LI>Addressable - Each component has a set of metadata which allows it to be identified clearly under its parent.</LI> * <LI>Addressable - Each component has a set of metadata which allows it to be identified clearly under its parent.</LI>
* </UL> * </UL>
* *
* This interface will start as a tagging interface, but will eventually include aspects of above by extension. * This interface includes more aspects of above by extension going forward.
*/ */
public interface NBComponent extends NBLabeledElement, NBComponentMetrics, NBMetricsQuery { public interface NBComponent extends AutoCloseable, NBLabeledElement, NBComponentMetrics, NBMetricsQuery, NBComponentServices {
NBComponent EMPTY_COMPONENT = new NBBaseComponent(null); NBComponent EMPTY_COMPONENT = new NBBaseComponent(null);
@ -46,4 +46,7 @@ public interface NBComponent extends NBLabeledElement, NBComponentMetrics, NBMet
List<NBComponent> getChildren(); List<NBComponent> getChildren();
default void beforeDetach() {} default void beforeDetach() {}
@Override
void close() throws RuntimeException;
} }

View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.adapters.api.util.TagFilter;
import java.util.*;
public class NBComponentFinder {
public static List<NBComponent> findComponents(String pattern, NBComponent startNode) {
TagFilter filter = new TagFilter(pattern);
List<NBComponent> found = new ArrayList<>();
Iterator<NBComponent> nbComponentIterator = NBComponentTraversal.traverseDepth(startNode);
nbComponentIterator.forEachRemaining(c -> {
if (filter.matchesLabeled(c)) {
found.add(c);
}
});
return found;
}
public static NBComponent findOneComponent(String pattern, NBComponent startNode) {
List<NBComponent> found = findComponents(pattern, startNode);
if (found.size()!=1) {
throw new RuntimeException("Expected exactly 1 componet, but found " + found.size()+": for '" + pattern + "'");
}
return found.get(0);
}
}

View File

@ -46,5 +46,4 @@ public interface NBComponentMetrics {
return found.get(0); return found.get(0);
} }
} }

View File

@ -0,0 +1,24 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
public interface NBComponentServices {
public NBBuilders create();
public NBFinders find();
}

View File

@ -16,16 +16,30 @@
package io.nosqlbench.components; package io.nosqlbench.components;
import io.nosqlbench.api.config.standard.TestComponent;
public class NBComponentSubScope implements AutoCloseable { public class NBComponentSubScope implements AutoCloseable {
private final NBComponent component; private NBComponent[] components;
public NBComponentSubScope(NBComponent component) { public NBComponentSubScope(NBComponent... components) {
this.component = component; this.components = components;
} }
@Override @Override
public void close() throws RuntimeException { public void close() throws RuntimeException {
component.beforeDetach(); for (NBComponent component : components) {
component.getParent().detachChild(component); component.beforeDetach();
NBComponent parent = component.getParent();
if (parent!=null) {
parent.detachChild(component);
}
}
}
public void add(TestComponent... adding) {
NBComponent[] newAry = new NBComponent[components.length+adding.length];
System.arraycopy(components,0,newAry,0,components.length);
System.arraycopy(adding,0,newAry,components.length,adding.length);
this.components = newAry;
} }
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
public class NBFinders {
private final NBBaseComponent base;
public NBFinders(NBBaseComponent base) {
this.base = base;
}
public NBMetric metric(String pattern) {
NBMetric metric = base.lookupMetricInTree(pattern);
if (metric!=null) { return metric; };
metric = base.findOneMetricInTree(pattern);
return metric;
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public abstract class PeriodicTaskComponent extends NBBaseComponent implements Runnable {
private static final Logger logger = LogManager.getLogger(PeriodicTaskComponent.class);
private final int intervalSeconds;
private final Lock lock = new ReentrantLock();
private final Condition shutdownSignal = lock.newCondition();
private final boolean oneLastTime;
Thread thread;
private boolean running = true;
public PeriodicTaskComponent(
NBComponent node,
NBLabels extraLabels,
int seconds,
boolean oneLastTime
) {
super(node, extraLabels);
this.intervalSeconds = seconds;
thread = Thread.ofVirtual().start(this);
this.oneLastTime=oneLastTime;
}
protected abstract void task();
@Override
public void run() {
long now = System.currentTimeMillis();
long reportAt = now + intervalSeconds * 1000L;
long waitfor = reportAt - now;
while (true) {
while (running && waitfor > 0) {
boolean signalReceived = false;
try {
lock.lock();
signalReceived = shutdownSignal.await(waitfor, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
if (signalReceived) {
logger.debug("signal shutting down " + this);
return;
}
now = System.currentTimeMillis();
waitfor = reportAt - now;
}
logger.info("summarizing metrics to console");
try {
task();
} catch (Exception e) {
logger.error(e);
} finally {
reportAt = reportAt + (intervalSeconds * 1000L);
now = System.currentTimeMillis();
waitfor = reportAt - now;
}
}
}
public void teardown() {
logger.debug("shutting down " + this);
lock.lock();
running = false;
shutdownSignal.signalAll();
lock.unlock();
logger.debug("signaled reporter thread to shut down " + description());
try {
thread.join();
} catch (InterruptedException e) {
logger.warn("interrupted while joining thread");
}
if (oneLastTime) {
logger.debug("running " + this + " one last time.");
task();
}
}
}

View File

@ -0,0 +1,38 @@
@startuml
'https://plantuml.com/sequence-diagram
'autonumber
boundary Scope as scope
control Parent as p
entity Child as c
== attachment ==
create p
scope -> p : new
create c
p -> c : new(p,...)
p <- c : attach
activate p
p -> c :
deactivate p
== detachment, parent initiated ==
p -> c: beforeDetach()
activate c
c -> c : internal logic,\nflushing, closing
p <- c :
deactivate c
p -> p : detachChild(c)
deactivate p
== detachment, child initiated ==
p <- c: detachChild(self)
activate p
p -> c:
deactivate p
@enduml

View File

@ -0,0 +1,48 @@
@startuml
'https://plantuml.com/sequence-diagram
'autonumber
boundary Scope as scope
control Parent as p
entity Child as c
entity External as e
== attachment ==
create p
scope -> p : new
create c
alt child constructor
p -> c : new(p,...)
c -> c : pre-init in super(...)
p <- c : attachChild(self)
activate p
p -> c:
deactivate p
c -> c : post-attach init
activate c
c --> e: connect
c <-- e: <connected>
deactivate c
p <- c:
end
== detachment always mediated by child ==
alt child finalize
c -> c: shutdown()
activate c
c -> c: flush data and cleanup
p <- c: detachChild(self)
activate p
p -> c
deactivate p
deactivate c
end
@enduml

View File

@ -69,7 +69,30 @@
* component type should be solely informational about the structure and properties of the component hierarchy when * component type should be solely informational about the structure and properties of the component hierarchy when
* possible. This will help organize usage patterns around themed utilities and keep the surface area of the core types * possible. This will help organize usage patterns around themed utilities and keep the surface area of the core types
* to a minimum.</p> * to a minimum.</p>
* <HR/>
* <H2>Types of Components</H2>
* The purpose of a component can determine its style of implementation.
* *
* <H3>Life-cycle Oriented Components</H3>
* <P>Life-cycle components represent executions of some user-managed scope, like session, scenarios, or activities.
* These components model the nesting structure of threads of execution, or for activities, groups of threads. As such,
* any sub-components they have are generally there to augment or contextualize the execution of the life-cycle component.
* In this case, the life-cycle component controls the life-line of its sub-components. When the life-cycle component is
* ready to finish its own execution, it will directly inform all components attached to it that it is time for them
* to do final housekeeping, including any final buffering and sending, connection tear-down, etc. As components, the parent
* component may not know what the details of these housekeeping steps are directly. But components are always something else too,
* and in the type-specific implementations which are triggered by component methods, appropriate integrations can take place.</P>
*
* <H3>Service Oriented Components</H3>
* <p>Service components are those which are created as an attachment to other components. They may or may not have additional asynchronous
* behavior with respect to their parent component, but they are always in service of the parent component. For example, metrics instruments
* like counters are passive, but reporters which send these metrics outside of the system are active and on a schedule.
* Service-oriented components generally do not control the duration of their lifetimes. When they are working properly, they exist
* along-side and attached to their parent component for the full lifetime of that parent, and then are reaped on demand by the parent
* component.</p>
*
* <HR/>
* <H2>Labeling Consistency and interlocks</H2>
* TODO: labeling consistency * TODO: labeling consistency
*/ */
package io.nosqlbench.components; package io.nosqlbench.components;

View File

@ -0,0 +1,96 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricCounter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
class AttachedMetricsSummaryReporterTest {
private final Logger logger = LogManager.getLogger(AttachedMetricsSummaryReporterTest.class);
@Test
public void testSingleObjectScope() {
try (TestComponent root = new TestComponent("root", "root")) {
try {
Thread.sleep(10000L);
} catch (InterruptedException ignored) {
}
logger.debug("scope ending");
}
}
// TODO: this output should also include the node itself
// TODO: end lifecycle events need to be supported for metrics flushing
@Test
public void testAttachedReporterScope() {
try (NBComponentSubScope scope = new NBComponentSubScope()) {
TestComponent root = new TestComponent("root", "root");
scope.add(root);
TestComponent l1 = new TestComponent(root, "l1", "l1");
NBMetricCounter counter = l1.create().counter("mycounter");
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(1);
NBFunctionGauge g1 = root.create().gauge("rootgauge", () -> 42d);
NBFunctionGauge g2 = l1.create().gauge("leafgauge", () -> 48d);
// This wait state is here only to emulate some time passing while background processing
// in the component hierarchy runs. Without it, you would be standing and immediate tearing
// down the structure, which is not a realistic scenario, but is probably a meaningful
// robustness test in and of itself
try {
Thread.sleep(3_000L);
} catch (InterruptedException ignored) {
}
logger.debug("scope ending");
}
// TODO: this output should also include the node itself
// TODO: end lifecycle events need to be supported for metrics flushing
}
@Test
public void testAttachedReporter() {
TestComponent root = new TestComponent("root", "root");
TestComponent l1 = new TestComponent(root, "l1", "l1");
NBMetricCounter counter = l1.create().counter("mycounter");
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(5);
NBFunctionGauge g1 = root.create().gauge("rootgauge", () -> 42d);
NBFunctionGauge g2 = l1.create().gauge("leafgauge", () -> 48d);
// TODO: this output should also include the node itself
// TODO: end lifecycle events need to be supported for metrics flushing
try {
Thread.sleep(2_000L);
} catch (InterruptedException ignored) {
}
reporter.close();
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.config.standard.TestComponent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
class NBComponentScaffoldingTest {
private final static Logger logger = LogManager.getLogger(NBComponentScaffoldingTest.class);
@Test
public void testBasicLayeringTeardown() {
TestComponent root = new TestComponent("root","root");
TestComponent a1 = new TestComponent(root,"a1","a1") {
@Override
protected void teardown() {
logger.debug("tearing down " + description());
}
};
TestComponent b2 = new TestComponent(a1,"b2","b2") {
@Override
protected void teardown() {
logger.debug("tearing down " + description());
}
};
root.close();
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
class NBComponentServicesTest {
@Test
public void testComponentServices() {
TestComponent root = new TestComponent("root", "root");
TestComponent a1 = new TestComponent(root, "a1", "a1");
TestComponent b1 = new TestComponent(a1, "b1", "b1");
NBMetricTimer timer1 = a1.create().timer("mfn1", 3);
String handle = timer1.getHandle();
timer1.update(23L, TimeUnit.MILLISECONDS);
NBMetric foundByHandle = root.find().metric(handle);
assertThat(foundByHandle).isEqualTo(timer1);
NBMetric foundByPattern = root.find().metric("name:mfn1");
assertThat(foundByPattern).isEqualTo(timer1);
NBFunctionGauge gauge = b1.create().gauge("test_gauge", () -> 5.2d);
String gaugeHandle = gauge.getHandle();
List<NBMetric> metricsInTree = root.findMetricsInTree("");
assertThat(metricsInTree).containsAll(List.of(timer1, gauge));
metricsInTree.forEach(m -> {
System.out.println("metric: " + m.toString());
});
}
}

View File

@ -42,6 +42,9 @@
</Appenders> </Appenders>
<Loggers> <Loggers>
<Logger name="StatusConsoleListener" level="trace">
<AppenderRef ref="STDOUT"/>
</Logger>
<Logger name="io.nosqlbench.docsys" level="info" additivity="false"> <Logger name="io.nosqlbench.docsys" level="info" additivity="false">
<AppenderRef ref="APPSLOG"/> <AppenderRef ref="APPSLOG"/>