Merge branch 'jshook/heartbeat'

This commit is contained in:
Jonathan Shook 2024-01-09 15:06:50 -06:00
commit ed24a67916
18 changed files with 685 additions and 187 deletions

View File

@ -401,25 +401,28 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
// intentionally not shown for warn-only // intentionally not shown for warn-only
NBCLI.logger.info(() -> "console logging level is " + options.getConsoleLogLevel()); NBCLI.logger.info(() -> "console logging level is " + options.getConsoleLogLevel());
Map<String, String> props = Map.of(
"summary", options.getReportSummaryTo(),
"logsdir", options.getLogsDirectory().toString(),
"progress", options.getProgressSpec(),
"prompush_cache", "prompush_cache.txt",
"heartbeat", String.valueOf(options.wantsHeartbeatIntervalMs())
);
/** /**
* At this point, the command stream from the CLI should be handed into the session, and the session should * At this point, the command stream from the CLI should be handed into the session, and the session should
* marshal and transform it for any scenario invocations directly. * marshal and transform it for any scenario invocations directly.
*/ */
try (
NBSession session = new NBSession( NBSession session = new NBSession(
new NBBaseComponent(null, new NBBaseComponent(null,
options.getLabelMap() options.getLabelMap()
.andDefault("jobname", "nosqlbench") .andDefault("jobname", "nosqlbench")
.andDefault("instance", "default") .andDefault("instance", "default")
), ),
sessionName sessionName,
); props
// TODO: Decide whether this should be part of ctor consistency )) {
Map.of(
"summary", options.getReportSummaryTo(),
"logsdir", options.getLogsDirectory().toString(),
"progress", options.getProgressSpec(),
"prompush_cache", "prompush_cache.txt"
).forEach(session::setComponentProp);
options.wantsReportCsvTo().ifPresent(cfg -> { options.wantsReportCsvTo().ifPresent(cfg -> {
MetricInstanceFilter filter = new MetricInstanceFilter(); MetricInstanceFilter filter = new MetricInstanceFilter();
@ -446,9 +449,10 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
ExecutionResult sessionResult = session.apply(options.getCommands()); ExecutionResult sessionResult = session.apply(options.getCommands());
// sessionResult.printSummary(System.out);
logger.info(sessionResult); logger.info(sessionResult);
return sessionResult.getStatus().code; return sessionResult.getStatus().code;
}
// sessionResult.printSummary(System.out);
} }
@ -465,10 +469,8 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
} }
basicHelp = basicHelp.replaceAll("PROG", this.commandName); basicHelp = basicHelp.replaceAll("PROG", this.commandName);
return basicHelp; return basicHelp;
} }
@Override @Override
public NBLabels getLabels() { public NBLabels getLabels() {
return labels; return labels;

View File

@ -63,6 +63,8 @@ public class NBCLIOptions {
private static final String LABELSPEC = "--labelspec"; private static final String LABELSPEC = "--labelspec";
private static final String ANNOTATORS_CONFIG = "--annotators"; private static final String ANNOTATORS_CONFIG = "--annotators";
private static final String HEARTBEAT_MILLIS = "--heartbeat-millis";
// Enabled if the TERM env var is provided // Enabled if the TERM env var is provided
private static final String ANSI = "--ansi"; private static final String ANSI = "--ansi";
@ -202,6 +204,7 @@ public class NBCLIOptions {
private String annotateLabelSpec = ""; private String annotateLabelSpec = "";
private String metricsLabelSpec = ""; private String metricsLabelSpec = "";
private String wantsToCatResource =""; private String wantsToCatResource ="";
private long heartbeatIntervalMs;
public boolean wantsLoggedMetrics() { public boolean wantsLoggedMetrics() {
return this.wantsConsoleMetrics; return this.wantsConsoleMetrics;
@ -642,6 +645,11 @@ public class NBCLIOptions {
arglist.removeFirst(); arglist.removeFirst();
this.wantsToCatResource = this.readWordOrThrow(arglist, "workload to cat"); this.wantsToCatResource = this.readWordOrThrow(arglist, "workload to cat");
break; break;
case HEARTBEAT_MILLIS:
arglist.removeFirst();
this.heartbeatIntervalMs =
Long.parseLong(this.readWordOrThrow(arglist, "heartbeat interval in ms"));
break;
default: default:
nonincludes.addLast(arglist.removeFirst()); nonincludes.addLast(arglist.removeFirst());
} }
@ -739,6 +747,10 @@ public class NBCLIOptions {
return this.wantsActivityTypes; return this.wantsActivityTypes;
} }
public long wantsHeartbeatIntervalMs() {
return this.heartbeatIntervalMs;
}
public boolean wantsTopicalHelp() { public boolean wantsTopicalHelp() {
return this.wantsActivityHelp; return this.wantsActivityHelp;
} }

View File

@ -16,6 +16,8 @@
package io.nosqlbench.engine.core.lifecycle.session; package io.nosqlbench.engine.core.lifecycle.session;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBInvokableCommand;
import io.nosqlbench.nb.api.components.status.NBLiveComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBFunctionGauge; import io.nosqlbench.nb.api.engine.metrics.instruments.NBFunctionGauge;
@ -43,7 +45,7 @@ import java.util.function.Function;
* on. * on.
* All NBScenarios are run within an NBSession. * All NBScenarios are run within an NBSession.
*/ */
public class NBSession extends NBBaseComponent implements Function<List<Cmd>, ExecutionResult>, NBTokenWords { public class NBSession extends NBLiveComponent implements Function<List<Cmd>, ExecutionResult>, NBTokenWords {
private final static Logger logger = LogManager.getLogger(NBSession.class); private final static Logger logger = LogManager.getLogger(NBSession.class);
// private final ClientSystemMetricChecker clientMetricChecker; // private final ClientSystemMetricChecker clientMetricChecker;
@ -57,12 +59,15 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
public NBSession( public NBSession(
NBLabeledElement labelContext, NBLabeledElement labelContext,
String sessionName String sessionName,
Map<String, String> props
) { ) {
super( super(
null, null,
labelContext.getLabels() labelContext.getLabels()
.and("session", sessionName) .and("session", sessionName),
props,
"session"
); );
new NBSessionSafetyMetrics(this); new NBSessionSafetyMetrics(this);
@ -86,13 +91,16 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
ResultCollector collector = new ResultCollector(); ResultCollector collector = new ResultCollector();
try (ResultContext results = new ResultContext(collector).ok()) { try (ResultContext results = new ResultContext(collector).ok()) {
for (NBCommandAssembly.CommandInvocation invocation : invocationCalls) { for (NBCommandAssembly.CommandInvocation invocation : invocationCalls) {
try {
String targetContext = invocation.containerName(); String targetContext = invocation.containerName();
String explanation = "in context '" + targetContext + "'";
try (NBInvokableCommand command = invocation.command()) {
explanation += " command '" + command.toString() + "'";
NBBufferedContainer container = getContext(targetContext); NBBufferedContainer container = getContext(targetContext);
NBCommandResult cmdResult = container.apply(invocation.command(), invocation.params()); NBCommandResult cmdResult = container.apply(command, invocation.params());
results.apply(cmdResult); results.apply(cmdResult);
} catch (Exception e) { } catch (Exception e) {
String msg = "While running command '" + invocation.command() + "' in container '" + invocation.containerName() + "', an error occurred: " + e.toString(); String msg = "While running " + explanation + "', an error occurred: " + e.toString();
onError(e);
logger.error(msg); logger.error(msg);
results.error(e); results.error(e);
break; break;

View File

@ -391,7 +391,7 @@ public class LoggerConfig extends ConfigurationFactory {
private void attachAuxLogger(ConfigurationBuilder<BuiltConfiguration> builder, String loggerName, Level fileLevel) { private void attachAuxLogger(ConfigurationBuilder<BuiltConfiguration> builder, String loggerName, Level fileLevel) {
String appenderName = loggerName+(("_APPENDER").toUpperCase()); String appenderName = loggerName+(("_APPENDER").toUpperCase());
String fileName = loggerDir.resolve(getFileBase() + "_"+loggerName+".log").toString().toLowerCase(); String fileName = loggerDir.resolve(getFileBase() + "_"+loggerName.toLowerCase()+".log").toString();
var appender = builder var appender = builder
.newAppender(appenderName, FileAppender.PLUGIN_NAME) .newAppender(appenderName, FileAppender.PLUGIN_NAME)
.addAttribute("append", false) .addAttribute("append", false)

View File

@ -21,7 +21,6 @@ import io.nosqlbench.nb.api.components.events.ComponentOutOfScope;
import io.nosqlbench.nb.api.components.events.DownEvent; import io.nosqlbench.nb.api.components.events.DownEvent;
import io.nosqlbench.nb.api.components.events.NBEvent; import io.nosqlbench.nb.api.components.events.NBEvent;
import io.nosqlbench.nb.api.components.events.UpEvent; import io.nosqlbench.nb.api.components.events.UpEvent;
import io.nosqlbench.nb.api.config.params.ElementData;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetric; import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetric;
import io.nosqlbench.nb.api.labels.NBLabels; import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -30,30 +29,37 @@ import org.apache.logging.log4j.Logger;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class NBBaseComponent extends NBBaseComponentMetrics implements NBComponent, NBTokenWords { public class NBBaseComponent extends NBBaseComponentMetrics implements NBComponent, NBTokenWords, NBComponentTimeline {
private final static Logger logger = LogManager.getLogger("RUNTIME"); private final static Logger logger = LogManager.getLogger("RUNTIME");
protected final NBComponent parent; protected final NBComponent parent;
protected final NBLabels labels; protected final NBLabels labels;
private final List<NBComponent> children = new ArrayList<>(); private final List<NBComponent> children = new ArrayList<>();
private long endAt=0L;
private final long startAt;
protected NBMetricsBuffer metricsBuffer = new NBMetricsBuffer(); protected NBMetricsBuffer metricsBuffer = new NBMetricsBuffer();
protected boolean bufferOrphanedMetrics = false; protected boolean bufferOrphanedMetrics = false;
private ConcurrentHashMap<String,String> props = new ConcurrentHashMap<>(); private ConcurrentHashMap<String,String> props = new ConcurrentHashMap<>();
protected Exception error;
protected long started_ns, teardown_ns, closed_ns, errored_ns;
protected NBInvokableState state = NBInvokableState.STARTING;
public NBBaseComponent(NBComponent parentComponent) { public NBBaseComponent(NBComponent parentComponent) {
this(parentComponent, NBLabels.forKV()); this(parentComponent, NBLabels.forKV());
} }
public NBBaseComponent(NBComponent parentComponent, NBLabels componentSpecificLabelsOnly) { public NBBaseComponent(NBComponent parentComponent, NBLabels componentSpecificLabelsOnly) {
this.started_ns = System.nanoTime();
this.labels = componentSpecificLabelsOnly; this.labels = componentSpecificLabelsOnly;
this.startAt = System.nanoTime();
if (parentComponent != null) { if (parentComponent != null) {
parent = parentComponent; parent = parentComponent;
parent.attachChild(this); parent.attachChild(this);
} else { } else {
parent = null; parent = null;
} }
state = (state==NBInvokableState.ERRORED) ? state : NBInvokableState.RUNNING;
}
public NBBaseComponent(NBComponent parentComponent, NBLabels componentSpecificLabelsOnly, Map<String, String> props) {
this(parentComponent,componentSpecificLabelsOnly);
props.forEach(this::setComponentProp);
} }
@Override @Override
@ -117,30 +123,40 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
@Override @Override
public final void close() throws RuntimeException { public final void close() throws RuntimeException {
state = (state==NBInvokableState.ERRORED) ? state : NBInvokableState.CLOSING;
closed_ns = System.nanoTime();
try { try {
logger.debug("cleaning up"); logger.debug("cleaning up");
ArrayList<NBComponent> children = new ArrayList<>(getChildren()); ArrayList<NBComponent> children = new ArrayList<>(getChildren());
for (NBComponent child : children) { for (NBComponent child : children) {
child.close(); child.close();
} }
teardown();
} catch (Exception e) { } catch (Exception e) {
logger.error(e); onError(e);
} finally { } finally {
logger.debug("detaching " + description()); logger.debug("detaching " + description());
if (parent != null) { if (parent != null) {
parent.detachChild(this); parent.detachChild(this);
} }
teardown();
} }
} }
public void onError(Exception e) {
RuntimeException wrapped = new RuntimeException("While in state " + this.state + ", an error occured: " + e, e);
logger.error(wrapped);
this.error = wrapped;
state=NBInvokableState.ERRORED;
}
/** /**
* Override this method in your component implementations when you need to do something * Override this method in your component implementations when you need to do something
* to close out your component. * to close out your component.
*/ */
protected void teardown() { protected void teardown() {
logger.debug("tearing down " + description()); logger.debug("tearing down " + description());
this.endAt = System.nanoTime(); this.teardown_ns = System.nanoTime();
this.state=(state==NBInvokableState.ERRORED) ? state : NBInvokableState.STOPPED;
} }
@Override @Override
@ -229,10 +245,10 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
@Override @Override
public long getNanosSinceStart() { public long getNanosSinceStart() {
if (endAt==0) { if (teardown_ns ==0) {
return System.nanoTime()-startAt; return System.nanoTime()- started_ns;
} else { } else {
return endAt-startAt; return teardown_ns - started_ns;
} }
} }
@ -255,4 +271,29 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
props.put(name, value); props.put(name, value);
return this; return this;
} }
@Override
public NBInvokableState getComponentState() {
return state;
}
@Override
public long nanosof_start() {
return this.started_ns;
}
@Override
public long nanosof_close() {
return this.closed_ns;
}
@Override
public long nanosof_teardown() {
return this.teardown_ns;
}
@Override
public long nanosof_error() {
return this.errored_ns;
}
} }

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.core;
public interface NBComponentTimeline {
NBInvokableState getComponentState();
/**
* This will be 0L if the component hasn't fully started, else it will be
* the {@link System#nanoTime()} of when the component entered its constructor
* @return nanosecond time of component construction
*/
long nanosof_start();
/**
* This will be 0L if the component hasn't began the process of closing down.
* @return nanosecond time of invoking {@link NBBaseComponent#close()}
*/
long nanosof_close();
/**
* This will be 0L if the component hasn't completed teardown. Otherwise it will be
* the {@link System#nanoTime()} when the base teardown logic in the component has completed.
* For this reason, it is imperative that any overrides to {@link NBBaseComponent#teardown()}
* are called, and called last in the overridden teardown method.
* @return nanosecond time of teardown completion
*/
long nanosof_teardown();
/**
* This will be 0L if the component hasn't logged an error. Otherwise it will be
* the {@link System#nanoTime()} of when the error was reported.
* @return nanosecond time of the error
*/
long nanosof_error();
}

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.core;
/**
* <PRE>{@code
* errored_at > 0 -> ERROR
* started_at == 0 -> STARTING
* <p>
* <p>
* <p>
* started_at > closed_at
* STARTING
* closed_at > started_at
* RUNNING
* teardown_at > closed_at
* STOPPING
* teardown_at
* STOPPED
* stopped_at
* }</pre>
*/
public enum NBInvokableState {
/**
* The component exists in some state but has not completed initialization / construction
*/
STARTING,
/**
* The component has completed initialization and is presumed to be running
*/
RUNNING,
/**
* The component has begun closing down, which means unwinding/closing any child components
*/
CLOSING,
/**
* The component has completed closing down, including its teardown logic
*/
STOPPED,
/**
* There was an error
*/
ERRORED;
}

View File

@ -25,90 +25,10 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public abstract class PeriodicTaskComponent extends NBBaseComponent implements Runnable { public abstract class PeriodicTaskComponent extends UnstartedPeriodicTaskComponent {
private static final Logger logger = LogManager.getLogger(PeriodicTaskComponent.class); public PeriodicTaskComponent(NBComponent node, NBLabels extraLabels, long millis, String threadName, FirstReport firstReport, LastReport lastReport) {
private final long intervalmillis; super(node, extraLabels, millis, threadName, firstReport, lastReport);
private final Lock lock = new ReentrantLock(); start();
private final Condition shutdownSignal = lock.newCondition();
private final boolean oneLastTime;
Thread thread;
private boolean running = true;
public PeriodicTaskComponent(
NBComponent node,
NBLabels extraLabels,
long millis,
boolean oneLastTime,
String threadName
) {
super(node, extraLabels);
this.intervalmillis = millis;
thread = Thread.ofVirtual().name(threadName).start(this);
this.oneLastTime = oneLastTime;
}
protected abstract void task();
@Override
public void run() {
long now = System.currentTimeMillis();
long reportAt = now + intervalmillis;
long waitfor = reportAt - now;
while (running) {
while (running && waitfor > 0) {
boolean signalReceived = false;
try {
lock.lock();
signalReceived = shutdownSignal.await(waitfor, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
if (signalReceived) {
logger.debug("signal shutting down " + this);
return;
}
now = System.currentTimeMillis();
waitfor = reportAt - now;
}
// logger.info("summarizing metrics to console");
try {
task();
} catch (Exception e) {
logger.error(e);
throw new RuntimeException(e);
} finally {
reportAt = reportAt + (intervalmillis);
now = System.currentTimeMillis();
waitfor = reportAt - now;
} }
} }
logger.info("shutting down periodic runnable component: " + description());
}
public void teardown() {
logger.debug("shutting down " + this);
lock.lock();
running = false;
shutdownSignal.signalAll();
lock.unlock();
logger.debug("signaled reporter thread to shut down " + description());
try {
thread.join();
} catch (InterruptedException e) {
logger.warn("interrupted while joining thread");
}
if (oneLastTime) {
logger.debug("running " + this + " one last time.");
task();
}
}
}

View File

@ -0,0 +1,167 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.core;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* <P>Because of ctor super calling order requirements, the task thread can't always be started
* automatically in super(...). If that is the case, then use this class directly and call
* start() at the end of your subtype ctor.</P>
*
* <P>Otherwise, it is safe to use {@link PeriodicTaskComponent} directly.</P>
*/
public abstract class UnstartedPeriodicTaskComponent extends NBBaseComponent implements Runnable {
private static final Logger logger = LogManager.getLogger(UnstartedPeriodicTaskComponent.class);
protected final long intervalmillis;
private final Lock lock = new ReentrantLock();
private final Condition shutdownSignal = lock.newCondition();
private final FirstReport firstReport;
private final LastReport lastReport;
private final String threadName;
Thread thread;
private boolean running = true;
public enum FirstReport {
Immediately,
OnInterval
}
public enum LastReport {
None,
onClose,
/**
* OnInterrupt is a stronger version of OnClose, including scenarios where the process is interrupted with a signal
*/
OnInterrupt
}
public UnstartedPeriodicTaskComponent(
NBComponent node,
NBLabels extraLabels,
long millis,
String threadName,
FirstReport firstReport,
LastReport lastReport
) {
super(node, extraLabels);
this.threadName = threadName;
this.intervalmillis = millis;
this.firstReport = firstReport;
this.lastReport = lastReport;
if(lastReport== LastReport.OnInterrupt) {
Thread hook=new Thread(this::task,"shutdownhook-"+threadName);
Runtime.getRuntime().addShutdownHook(hook);
}
// TODO: There is a potential race condition between init and invoke here, if millis is low enough and post-super() state is needed
}
public void start() {
if (firstReport==FirstReport.Immediately) task();
thread = Thread.ofVirtual().name(threadName).start(this);
}
/**
* This task should only do what is needed once each period.
* If it throws any exceptions, then these exceptions will cause the period task
* to exit. Thus, if you need to allow failures in some cases while keeping
* the caller (scheduler) active, all errors should be caught and handled
* internally.
*/
protected abstract void task();
@Override
public void run() {
long now = System.currentTimeMillis();
long reportAt = now + intervalmillis;
long waitfor = reportAt - now;
while (running) {
while (running && waitfor > 0) {
boolean signalReceived = false;
try {
lock.lock();
signalReceived = shutdownSignal.await(waitfor, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
if (signalReceived) {
logger.debug("signal shutting down " + this);
return;
}
now = System.currentTimeMillis();
waitfor = reportAt - now;
}
// logger.info("summarizing metrics to console");
try {
lock.lock();
task();
} catch (Exception e) {
logger.error(e);
throw new RuntimeException(e);
} finally {
reportAt = reportAt + (intervalmillis);
now = System.currentTimeMillis();
waitfor = reportAt - now;
lock.unlock();
}
}
logger.info("shutting down periodic runnable component: " + description());
}
public void teardown() {
logger.debug("shutting down " + this);
lock.lock();
running = false;
shutdownSignal.signalAll();
lock.unlock();
// if (lastReport==LastReport.onClose || lastReport==LastReport.OnInterrupt) {
// logger.debug("final task() call for period component " + description());
// task();
// }
logger.debug("signaled reporter thread to shut down " + description());
try {
thread.join();
} catch (InterruptedException e) {
logger.warn("interrupted while joining thread");
}
if (this.lastReport== LastReport.onClose) {
logger.debug("running " + this + " one last time on close().");
task();
}
super.teardown();
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.status;
import io.nosqlbench.nb.api.components.core.UnstartedPeriodicTaskComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class ComponentPulse extends UnstartedPeriodicTaskComponent {
private final static Logger logger = LogManager.getLogger(ComponentPulse.class);
private final Path hbpath;
private final NBLiveComponent pulseOf;
public ComponentPulse(NBLiveComponent pulseOf, NBLabels extraLabels, String fileNameLabel, long millis) {
super(
pulseOf,
extraLabels,
millis,
"PULSE-" + pulseOf.description(),
FirstReport.Immediately,
LastReport.OnInterrupt
);
this.pulseOf = pulseOf;
String logsdir = getComponentProp("logsdir").orElseThrow();
this.hbpath = Path.of(logsdir).resolve(pulseOf.getLabels().valueOf(fileNameLabel)+"_status.yaml");
start();
}
@Override
protected void task() {
logger.debug("emitting pulse for :" + this.pulseOf.description());
Heartbeat heartbeat = pulseOf.heartbeat().withHeartbeatDetails(intervalmillis,System.currentTimeMillis());
try {
Files.writeString(hbpath, heartbeat.toYaml(), StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (IOException e) {
logger.error("Unable to write heartbeat data to " + hbpath.toString() + ": " + e);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.status;
import io.nosqlbench.nb.api.components.core.NBInvokableState;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.snakeyaml.engine.v2.api.Dump;
import org.snakeyaml.engine.v2.api.DumpSettings;
import org.snakeyaml.engine.v2.common.FlowStyle;
import java.util.Map;
public record Heartbeat(
NBLabels labels,
NBInvokableState state,
long started_at,
long session_time_ns,
long heartbeat_interval_ms,
long heartbeat_epoch_ms
) {
public final static Dump dump = createDump();
private static Dump createDump() {
DumpSettings settings = DumpSettings.builder().setDefaultFlowStyle(FlowStyle.BLOCK).build();
return new Dump(settings, new HeartbeatRepresenter(settings));
}
public Heartbeat withHeartbeatDetails(long new_heartbeat_interval_ms, long new_heartbeat_ms_epoch) {
return new Heartbeat(
labels,
state,
started_at,
session_time_ns,
new_heartbeat_interval_ms,
new_heartbeat_ms_epoch
);
}
public String toYaml() {
return toString();
}
public Map<String, Object> toMap() {
return Map.of(
"labels", labels.asMap(),
"state", state,
"started_at_epochms", started_at,
"session_time_ns", session_time_ns,
"heartbeat_interval_ms", heartbeat_interval_ms,
"heartbeat_epoch_ms", heartbeat_epoch_ms
);
}
@Override
public String toString() {
return dump.dumpToString(toMap());
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.status;
import io.nosqlbench.nb.api.components.core.NBInvokableState;
import org.snakeyaml.engine.v2.api.DumpSettings;
import org.snakeyaml.engine.v2.api.RepresentToNode;
import org.snakeyaml.engine.v2.nodes.Node;
import org.snakeyaml.engine.v2.representer.StandardRepresenter;
public class HeartbeatRepresenter extends StandardRepresenter {
public HeartbeatRepresenter(DumpSettings settings) {
super(settings);
this.representers.put(NBInvokableState.class, new RepresentEnumToString());
}
public class RepresentEnumToString implements RepresentToNode {
@Override
public Node representData(Object o) {
if (o instanceof Enum<?> e) {
String name = e.name();
return HeartbeatRepresenter.this.represent(name);
} else {
throw new RuntimeException("Unable to represent as enum: " + o.toString() + " (class " + o.getClass().getSimpleName() + "'");
}
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nb.api.components.status;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.core.NBInvokableState;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.Map;
/**
* A <EM>live</EM> component is one which provides evidence that it is either
* in a healthy state or that it is not, via a heartbeat mechanism.
*/
public class NBLiveComponent extends NBBaseComponent {
public NBLiveComponent(NBComponent parentComponent) {
super(parentComponent);
}
public NBLiveComponent(NBComponent parentComponent, NBLabels componentSpecificLabelsOnly, Map<String, String> props, String liveLabel) {
super(parentComponent, componentSpecificLabelsOnly, props);
// attaches, no further reference needed
new ComponentPulse(this, NBLabels.forKV(), liveLabel, Long.parseLong(getComponentProp("heartbeat").orElse("60000")));
}
public Heartbeat heartbeat() {
return new Heartbeat(
getLabels(),
this.getComponentState(),
started_ns,
sessionTimeMs(),
0L,
0L
);
}
private long sessionTimeMs() {
NBInvokableState state = getComponentState();
long nanos = switch (state) {
case ERRORED -> (nanosof_error() - nanosof_start());
case STARTING, RUNNING -> (System.nanoTime() - nanosof_start());
case CLOSING -> (nanosof_close() - nanosof_start());
case STOPPED -> (nanosof_teardown() - nanosof_start());
};
return nanos / 1_000_000L;
}
}

View File

@ -44,7 +44,7 @@ public class ConsoleReporter extends PeriodicTaskComponent {
public ConsoleReporter(NBComponent node, NBLabels extraLabels, long millis, boolean oneLastTime, public ConsoleReporter(NBComponent node, NBLabels extraLabels, long millis, boolean oneLastTime,
PrintStream output, Set<MetricAttribute> disabledMetricAttributes) { PrintStream output, Set<MetricAttribute> disabledMetricAttributes) {
super(node, extraLabels, millis, oneLastTime, "REPORT-CONSOLE"); super(node, extraLabels, millis, "REPORT-CONSOLE", FirstReport.OnInterval, LastReport.OnInterrupt);
this.output = output; this.output = output;
this.dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, this.dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT,
DateFormat.MEDIUM, DateFormat.MEDIUM,

View File

@ -59,7 +59,7 @@ public class CsvReporter extends PeriodicTaskComponent {
public CsvReporter(NBComponent node, Path reportTo, long intervalMs, MetricInstanceFilter filter, public CsvReporter(NBComponent node, Path reportTo, long intervalMs, MetricInstanceFilter filter,
NBLabels extraLabels) { NBLabels extraLabels) {
super(node, extraLabels, intervalMs, false,"REPORT-CSV"); super(node, extraLabels, intervalMs, "REPORT-CSV", FirstReport.OnInterval, LastReport.OnInterrupt);
this.component = node; this.component = node;
this.reportTo = reportTo; this.reportTo = reportTo;
this.filter = filter; this.filter = filter;

View File

@ -51,7 +51,7 @@ public class Log4JMetricsReporter extends PeriodicTaskComponent {
final NBLabels extraLabels, final NBLabels extraLabels,
final long millis, final long millis,
final boolean oneLastTime) { final boolean oneLastTime) {
super(component, extraLabels, millis, oneLastTime,"REPORT-LOG4J"); super(component, extraLabels, millis, "REPORT-LOG4J", FirstReport.OnInterval, LastReport.OnInterrupt);
this.loggerProxy = loggerProxy; this.loggerProxy = loggerProxy;
this.marker = marker; this.marker = marker;
} }

View File

@ -51,7 +51,7 @@ public class PromPushReporterComponent extends PeriodicTaskComponent {
private String bearerToken; private String bearerToken;
public PromPushReporterComponent(NBComponent parent, String endpoint, long intervalMs, NBLabels nbLabels) { public PromPushReporterComponent(NBComponent parent, String endpoint, long intervalMs, NBLabels nbLabels) {
super(parent, nbLabels.and("_type", "prom-push"), intervalMs, true, "REPORT-PROMPUSH"); super(parent, nbLabels.and("_type", "prom-push"), intervalMs, "REPORT-PROMPUSH",FirstReport.OnInterval, LastReport.OnInterrupt);
String jobname = getLabels().valueOfOptional("jobname").orElse("default"); String jobname = getLabels().valueOfOptional("jobname").orElse("default");
String instance = getLabels().valueOfOptional("instance").orElse("default"); String instance = getLabels().valueOfOptional("instance").orElse("default");
if (jobname.equals("default") || instance.equals("default")) { if (jobname.equals("default") || instance.equals("default")) {

View File

@ -30,6 +30,7 @@ import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.api.parallel.ExecutionMode;
import java.util.List; import java.util.List;
import java.util.Map;
@Disabled @Disabled
@Execution(ExecutionMode.SAME_THREAD) @Execution(ExecutionMode.SAME_THREAD)
@ -44,7 +45,7 @@ public class ScenarioExampleTests {
NBCLIOptions parser = new NBCLIOptions(params); NBCLIOptions parser = new NBCLIOptions(params);
List<Cmd> commands = parser.getCommands(); List<Cmd> commands = parser.getCommands();
var myroot = new TestComponent("test_"+params[0]); var myroot = new TestComponent("test_"+params[0]);
NBSession session = new NBSession(myroot,"session_"+params[0]); NBSession session = new NBSession(myroot,"session_"+params[0], Map.of());
System.out.println("=".repeat(29) + " Running scenario test for example scenario: " + params[0]); System.out.println("=".repeat(29) + " Running scenario test for example scenario: " + params[0]);
ExecutionResult result = session.apply(commands); ExecutionResult result = session.apply(commands);
return result; return result;