bursty cycle rate test online

This commit is contained in:
Jonathan Shook 2023-10-11 22:51:52 -05:00
parent 32b13c0134
commit bff86b5525
42 changed files with 500 additions and 440 deletions

View File

@ -20,8 +20,8 @@ import groovy.lang.Binding;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.api.extensions.SandboxExtensionFinder;
import io.nosqlbench.api.extensions.ScriptingExtensionPluginInfo;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.BundledExtensions;
import io.nosqlbench.virtdata.core.bindings.Bindings;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.BindPoint;
@ -75,7 +75,7 @@ public class GroovyCycleFunction<T> implements CycleFunction<T> {
}
private void addServices() {
for (final ScriptingExtensionPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
for (final ScriptingExtensionPluginInfo<?> extensionDescriptor : BundledExtensions.findAll()) {
staticImports.addAll(extensionDescriptor.autoImportStaticMethodClasses());
if (!extensionDescriptor.isAutoLoading()) {
logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");

View File

@ -81,9 +81,16 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
public SimRate(NBComponent parent, SimRateSpec spec) {
super(parent, NBLabels.forKV());
this.spec = spec;
initMetrics();
startFiller();
}
private void initMetrics() {
create().gauge("cycles_waittime",() -> (double)getWaitTimeDuration().get(ChronoUnit.NANOS));
create().gauge("_config_cyclerate", () -> spec.opsPerSec);
create().gauge("_config_burstrate", () -> spec.burstRatio);
}
public long refill() {
try {

View File

@ -116,7 +116,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
@Override
public synchronized void initActivity() {
// initOrUpdateRateLimiters(this.activityDef);
initOrUpdateRateLimiters(this.activityDef);
}
public synchronized NBErrorHandler getErrorHandler() {
@ -307,7 +307,6 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));

View File

@ -43,7 +43,7 @@ public class ScriptEnvBuffer extends SimpleScriptContext {
synchronized(this) {
if (stdoutBuffer==null) {
Writer superWriter = super.getWriter();
stdoutBuffer = new DiagWriter(superWriter, new InterjectingCharArrayWriter(" stdout "));
stdoutBuffer = new DiagWriter(new InterjectingCharArrayWriter(" stdout "),superWriter);
}
}
}
@ -56,7 +56,7 @@ public class ScriptEnvBuffer extends SimpleScriptContext {
synchronized(this) {
if (stderrBuffer==null) {
Writer superErrorWriter = super.getErrorWriter();
stderrBuffer = new DiagWriter(superErrorWriter, new InterjectingCharArrayWriter(" stderr "));
stderrBuffer = new DiagWriter(new InterjectingCharArrayWriter(" stderr "),superErrorWriter);
}
}
}

View File

@ -23,7 +23,7 @@ import com.codahale.metrics.MetricFilter;
import io.nosqlbench.api.engine.metrics.instruments.*;
import io.nosqlbench.api.engine.metrics.reporters.ConsoleReporter;
import io.nosqlbench.api.engine.metrics.reporters.Log4JMetricsReporter;
import io.nosqlbench.components.NBBuilders;
import io.nosqlbench.components.NBCreators;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBComponentTraversal;
import io.nosqlbench.components.NBFinders;
@ -63,7 +63,7 @@ public class ExecutionMetricsResult extends ExecutionResult {
public String getMetricsSummary(NBComponent component) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try (final PrintStream ps = new PrintStream(os)) {
final NBBuilders.ConsoleReporterBuilder builder = new NBBuilders.ConsoleReporterBuilder(component, ps);
final NBCreators.ConsoleReporterBuilder builder = new NBCreators.ConsoleReporterBuilder(component, ps);
final Set<MetricAttribute> disabled = new HashSet<>(ExecutionMetricsResult.INTERVAL_ONLY_METRICS);
if (60000 > this.getElapsedMillis()) disabled.addAll(ExecutionMetricsResult.OVER_ONE_MINUTE_METRICS);
builder.disabledMetricAttributes(disabled);
@ -90,7 +90,7 @@ public class ExecutionMetricsResult extends ExecutionResult {
ExecutionResult.logger.debug("-- active data on this last report. (The workload has already stopped.) Record --");
ExecutionResult.logger.debug("-- metrics to an external format to see values for each reporting interval. --");
ExecutionResult.logger.debug("-- BEGIN METRICS DETAIL --");
final Log4JMetricsReporter reporter = new NBBuilders.Log4jReporterBuilder(component)
final Log4JMetricsReporter reporter = new NBCreators.Log4jReporterBuilder(component)
.withLoggingLevel(Log4JMetricsReporter.LoggingLevel.DEBUG)
.filter(MetricFilter.ALL)
.outputTo(ExecutionResult.logger)

View File

@ -18,7 +18,6 @@ package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import io.nosqlbench.engine.core.lifecycle.session.NBSession;
import java.io.*;
@ -61,58 +60,44 @@ public class NBDefaultSceneFixtures implements NBSceneFixtures {
* Extensions provide additional scripting capabilities which are not provided by the
* scripting or other runtimes, or new ways of tapping into extant features.
*/
private Extensions extensions;
private PrintWriter out;
private PrintWriter err;
private Reader in;
public NBDefaultSceneFixtures(ScenarioParams params, NBComponent parent, ScenarioActivitiesController controller, Extensions extensions, PrintWriter out, PrintWriter err, Reader in) {
public NBDefaultSceneFixtures(ScenarioParams params, ScenarioActivitiesController controller, PrintWriter out, PrintWriter err, Reader in) {
this.params = params;
this.session = parent;
this.controller = controller;
this.extensions = extensions;
this.out = out;
this.err = err;
this.in = in;
}
public static NBSceneFixtures ofDefault(String name) {
return new NBDefaultSceneFixtures(
new ScenarioParams(),
new NBSession(
new TestComponent("scene", name), "scene~"+name
),
new ScenarioActivitiesController(),
Extensions.ofNone(),
new PrintWriter(System.out),
new PrintWriter(System.err),
new InputStreamReader(System.in)
);
}
// public static NBSceneFixtures ofDefault(String name) {
// return new NBDefaultSceneFixtures(
// new ScenarioParams(),
// new NBSession(
// new TestComponent("scene", name), "scene~"+name
// ),
// new ScenarioActivitiesController(),
// new PrintWriter(System.out),
// new PrintWriter(System.err),
// new InputStreamReader(System.in)
// );
// }
//
@Override
public ScenarioParams params() {
return params;
}
@Override
public NBComponent component() {
return session;
}
@Override
public ScenarioActivitiesController controller() {
return controller;
}
@Override
public Extensions extensions() {
return extensions;
}
@Override
public PrintWriter out() {
return out;

View File

@ -21,7 +21,6 @@ import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.scripting.DiagReader;
import io.nosqlbench.engine.api.scripting.DiagWriter;
import io.nosqlbench.engine.api.scripting.InterjectingCharArrayWriter;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.PrintWriter;
import java.io.Reader;
@ -35,6 +34,7 @@ public class NBSceneBuffer implements NBSceneFixtures {
virtual,
traced
}
private final IOType iotype;
private DiagWriter stdoutBuffer;
private DiagWriter stderrBuffer;
@ -46,8 +46,8 @@ public class NBSceneBuffer implements NBSceneFixtures {
switch (iotype) {
case traced:
stdoutBuffer = new DiagWriter(fixtures.out(), new InterjectingCharArrayWriter(" stdout "));
stderrBuffer = new DiagWriter(fixtures.err(), new InterjectingCharArrayWriter(" stderr "));
stdoutBuffer = new DiagWriter(new InterjectingCharArrayWriter(" stdout "), fixtures.out());
stderrBuffer = new DiagWriter(new InterjectingCharArrayWriter(" stderr "), fixtures.err());
stdinBuffer = new DiagReader(fixtures.in(), " stdin ");
break;
case virtual:
@ -58,7 +58,7 @@ public class NBSceneBuffer implements NBSceneFixtures {
case connected:
stdoutBuffer = new DiagWriter(fixtures.out());
stderrBuffer = new DiagWriter(fixtures.err());
stdinBuffer = new DiagReader(fixtures.in(), " stdin ");
stdinBuffer = new DiagReader(fixtures.in());
break;
}
@ -74,21 +74,11 @@ public class NBSceneBuffer implements NBSceneFixtures {
return fixtures.params();
}
@Override
public NBComponent component() {
return fixtures.component();
}
@Override
public ScenarioActivitiesController controller() {
return fixtures.controller();
}
@Override
public Extensions extensions() {
return fixtures.extensions();
}
@Override
public PrintWriter out() {
return stdoutBuffer;
@ -105,19 +95,19 @@ public class NBSceneBuffer implements NBSceneFixtures {
}
public String getIOLog() {
return this.stdoutBuffer.getTimedLog()+this.stderrBuffer.getTimedLog();
return this.stdoutBuffer.getTimedLog() + this.stderrBuffer.getTimedLog();
}
public NBSceneFixtures asFixtures() {
return (NBSceneFixtures) this;
}
public static NBSceneBuffer init(String name) {
TestComponent root = new TestComponent("scene", "self");
return new NBSceneBuffer(NBDefaultSceneFixtures.ofDefault(name));
}
public static SceneBuilderFacets.WantsContext builder() {
public static SceneBuilderFacets.WantsController builder() {
return new SceneBuilder();
}
public static NBSceneBuffer traced(NBComponent component) {
return builder().tracedIO().build(component);
}
}

View File

@ -17,7 +17,6 @@
package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.PrintWriter;
import java.io.Reader;
@ -25,20 +24,13 @@ import java.io.Reader;
public interface NBSceneFixtures {
ScenarioParams params();
NBComponent component();
ScenarioActivitiesController controller();
Extensions extensions();
PrintWriter out();
PrintWriter err();
Reader in();
public static NBSceneFixtures NEW(String sceneName) {
return NBDefaultSceneFixtures.ofDefault(sceneName);
}
}

View File

@ -52,14 +52,6 @@ public class ScenarioActivitiesController extends NBBaseComponent {
private final ExecutorService executorService;
public ScenarioActivitiesController() {
super(new TestComponent("test","test"));
this.activityLoader = new ActivityLoader();
ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this);
IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler);
this.executorService = Executors.newVirtualThreadPerTaskExecutor();
}
public ScenarioActivitiesController(NBComponent parent) {
super(parent);
this.activityLoader = new ActivityLoader();

View File

@ -19,7 +19,6 @@ package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.BufferedReader;
import java.io.InputStreamReader;
@ -30,27 +29,17 @@ import java.util.Map;
public class SceneBuilder implements SceneBuilderFacets.ALL {
private Map<String,String> params = Map.of();
private ScenarioActivitiesController controller;
private Extensions extensions;
private PrintWriter out = new PrintWriter(System.out);
private PrintWriter err = new PrintWriter(System.err);
private Reader in = new InputStreamReader(System.in);
private NBComponent component;
private NBSceneBuffer.IOType iotype;
private NBSceneBuffer.IOType iotype = NBSceneBuffer.IOType.traced;
@Override
public SceneBuilder component(NBComponent component) {
this.component = component;
return this;
}
public NBSceneBuffer build() {
public NBSceneBuffer build(NBComponent forScenario) {
NBDefaultSceneFixtures fixtures =
new NBDefaultSceneFixtures(
ScenarioParams.of(this.params),
this.component,
((this.controller!=null) ? this.controller : new ScenarioActivitiesController(component)),
this.extensions,
((this.controller!=null) ? this.controller : new ScenarioActivitiesController(forScenario)),
this.out,
this.err,
this.in);
@ -71,7 +60,7 @@ public class SceneBuilder implements SceneBuilderFacets.ALL {
}
@Override
public SceneBuilder err(PrintWriter err) {
public SceneBuilderFacets.WantsParams err(PrintWriter err) {
this.err = err;
return this;
}
@ -83,12 +72,6 @@ public class SceneBuilder implements SceneBuilderFacets.ALL {
}
@Override
public SceneBuilder extensions(Extensions extensions) {
this.extensions = extensions;
return this;
}
@Override
public SceneBuilder params(Map<String, String> params) {
this.params=params;
@ -96,19 +79,19 @@ public class SceneBuilder implements SceneBuilderFacets.ALL {
}
@Override
public SceneBuilderFacets.WantsExtensions virtualIO() {
public SceneBuilderFacets.WantsParams virtualIO() {
this.iotype= NBSceneBuffer.IOType.virtual;
return this;
}
@Override
public SceneBuilderFacets.WantsExtensions connectedIO() {
public SceneBuilderFacets.WantsParams connectedIO() {
this.iotype = NBSceneBuffer.IOType.connected;
return this;
}
@Override
public SceneBuilderFacets.WantsExtensions tracedIO() {
public SceneBuilderFacets.WantsParams tracedIO() {
this.iotype=NBSceneBuffer.IOType.traced;
return this;
}

View File

@ -19,7 +19,6 @@ package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.PrintWriter;
import java.io.Reader;
@ -27,9 +26,7 @@ import java.util.Map;
public interface SceneBuilderFacets {
public interface ALL extends
WantsContext,
WantsController,
WantsExtensions,
WantsStderr,
WantsStdout,
WantsStdin,
@ -39,10 +36,6 @@ public interface SceneBuilderFacets {
}
public interface WantsContext {
public WantsController component(NBComponent component);
}
public interface WantsController extends WantsStdin, WantsIoType {
public WantsStdin controller(ScenarioActivitiesController controller);
}
@ -57,7 +50,7 @@ public interface SceneBuilderFacets {
}
public interface WantsStderr extends CanBuild {
public WantsExtensions err(PrintWriter err);
public WantsParams err(PrintWriter err);
}
public interface WantsIoType extends CanBuild {
@ -65,23 +58,19 @@ public interface SceneBuilderFacets {
* If you want the stdin, stdout, stderr streams to be contained only within the scenario's
* execution environment, not connected to the outside world, do this.
*/
public WantsExtensions virtualIO();
public WantsParams virtualIO();
/**
* If you want to connect stdin, stdout, stderr streams to the system in, out and error streams,
* do this.
*/
public WantsExtensions connectedIO();
public WantsParams connectedIO();
/**
* If you want to connect the console IO streams to the outside world, but also capture them for
* diagnostics or other purposes, do this.
*/
public WantsExtensions tracedIO();
}
public interface WantsExtensions extends CanBuild {
public WantsParams extensions(Extensions extensions);
public WantsParams tracedIO();
}
public interface WantsParams extends CanBuild {
@ -89,7 +78,7 @@ public interface SceneBuilderFacets {
}
public interface CanBuild {
NBSceneBuffer build();
NBSceneBuffer build(NBComponent forComponent);
}
}

View File

@ -20,7 +20,6 @@ import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBScenario;
import java.io.PrintWriter;
@ -28,13 +27,11 @@ import java.io.Reader;
import java.io.Writer;
public abstract class SCBaseScenario extends NBScenario {
protected NBComponent component;
protected Reader stdin;
protected PrintWriter stdout;
protected Writer stderr;
protected ScenarioActivitiesController controller;
protected ScenarioParams params;
protected Extensions extensions;
public SCBaseScenario(NBComponent parentComponent, String scenarioName) {
super(parentComponent, scenarioName);
@ -42,14 +39,17 @@ public abstract class SCBaseScenario extends NBScenario {
@Override
protected final void runScenario(NBSceneFixtures shell) {
this.component = shell.component();
this.stdin = shell.in();
this.stdout = shell.out();
this.stderr = shell.err();
this.controller = shell.controller();
this.params = shell.params();
this.extensions = shell.extensions();
invoke();
try {
invoke();
} catch (Exception e) {
stdout.println(e.toString());
throw e;
}
}
/**
@ -62,7 +62,6 @@ public abstract class SCBaseScenario extends NBScenario {
* <LI>stdout, stderr</LI>- a {@link PrintWriter}; This can be buffered virtually, attached to {@link System#out} and {@link System#err} or both for IO tracing.</LI>
* <LI>controller - A dedicated {@link ScenarioActivitiesController} which can be used to define, start, top, and interact with activities.</LI>
* <LI>params - The {@link ScenarioParams} which have been passed to this scenario.</LI>
* <LI>extensions - A dedicated ahndle to the {@link Extensions} service.</LI>
* <LI><EM>all component services</EM> as this scenario IS a component. This includes all implemented methods in any of the {@link NBComponent} sub-interfaces.</EM>
* </LI>
* </UL>

View File

@ -46,6 +46,7 @@ public class ScenarioResult {
}
}
public void exitWithCode() {
System.out.print(getIOLog());
if (exception!=null) {
System.exit(2);
}

View File

@ -69,11 +69,9 @@ public class ScenariosExecutor extends NBBaseComponent {
@NotNull
private NBSceneBuffer getNbSceneBuffer(Map<String, String> params) {
return NBSceneBuffer.builder()
.component(this.getParent())
.tracedIO()
.extensions(loadExtensions())
.params(params)
.build();
.build(parent);
}
@Override
@ -269,30 +267,6 @@ public class ScenariosExecutor extends NBBaseComponent {
this.stoppingException = new RuntimeException("Error in scenario thread " + t.getName(), e);
}
private Extensions loadExtensions() {
Extensions extensions = new Extensions();// TODO: Load component oriented extensions into here
// for (final ScriptingExtensionPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
// if (!extensionDescriptor.isAutoLoading()) {
// this.logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");
// continue;
// }
//
// final Logger extensionLogger =
// LogManager.getLogger("extensions." + extensionDescriptor.getBaseVariableName());
// final Object extensionObject = extensionDescriptor.getExtensionObject(
// extensionLogger,
// metricRegistry,
// this.scriptEnv
// );
// ScenarioMetadataAware.apply(extensionObject, this.getScenarioMetadata());
// this.logger.trace(() -> "Adding extension object: name=" + extensionDescriptor.getBaseVariableName() +
// " class=" + extensionObject.getClass().getSimpleName());
// this.scriptEngine.put(extensionDescriptor.getBaseVariableName(), extensionObject);
// }
return extensions;
}
public ScenarioResult run(NBScenario scenario, Map<String,String> params) {
return scenario.apply(getNbSceneBuffer(params));
}

View File

@ -21,6 +21,7 @@ import io.nosqlbench.api.spi.SimpleServiceLoader;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponentSubScope;
import io.nosqlbench.components.decorators.NBTokenWords;
import io.nosqlbench.engine.cli.BasicScriptBuffer;
import io.nosqlbench.engine.cli.Cmd;
import io.nosqlbench.engine.cli.ScriptBuffer;
@ -47,7 +48,7 @@ import java.util.function.Function;
* on.
* All NBScenarios are run within an NBSession.
*/
public class NBSession extends NBBaseComponent implements Function<List<Cmd>, ExecutionResult> {
public class NBSession extends NBBaseComponent implements Function<List<Cmd>, ExecutionResult>, NBTokenWords {
private final static Logger logger = LogManager.getLogger(NBSession.class);
private final String sessionName;

View File

@ -1,37 +0,0 @@
package io.nosqlbench.engine.core.lifecycle.scenario.context;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io.nosqlbench.api.config.standard.TestComponent;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class SceneBuilderTest {
@Test
public void testBuildDefaultScene() {
NBSceneBuffer buffer = NBSceneBuffer.builder()
.component(new TestComponent("test", "test"))
.tracedIO()
.build();
}
}

View File

@ -21,7 +21,7 @@ import io.nosqlbench.components.NBComponent;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.Logger;
@Service(value = ScriptingExtensionPluginInfo.class, selector = "adder")
@Service(value = ScriptingExtensionPluginInfo.class, selector = "example")
public class ExamplePluginData implements ScriptingExtensionPluginInfo<ExamplePlugin> {
@Override

View File

@ -19,7 +19,7 @@ package io.nosqlbench.api.engine.metrics.reporters;
import com.codahale.metrics.*;
import io.nosqlbench.api.engine.metrics.instruments.*;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBBuilders;
import io.nosqlbench.components.NBCreators;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBFinders;
import io.nosqlbench.components.PeriodicTaskComponent;
@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
public class Log4JMetricsReporter extends PeriodicTaskComponent {
public enum LoggingLevel { TRACE, DEBUG, INFO, WARN, ERROR }
private final NBBuilders.LoggerProxy loggerProxy;
private final NBCreators.LoggerProxy loggerProxy;
private final Marker marker;
private final TimeUnit rateUnit = TimeUnit.NANOSECONDS;
private final TimeUnit durationUnit = TimeUnit.NANOSECONDS;
@ -45,7 +45,7 @@ public class Log4JMetricsReporter extends PeriodicTaskComponent {
private final long rateFactor = TimeUnit.NANOSECONDS.toSeconds(1);
public Log4JMetricsReporter(final NBComponent component,
final NBBuilders.LoggerProxy loggerProxy,
final NBCreators.LoggerProxy loggerProxy,
final Marker marker,
final MetricFilter filter,
final NBLabels extraLabels,

View File

@ -1,42 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.api.extensions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;
public class SandboxExtensionFinder {
private final static List<ScriptingExtensionPluginInfo<?>> extensionDescriptors = new ArrayList<>();
public static List<ScriptingExtensionPluginInfo<?>> findAll() {
if (extensionDescriptors.isEmpty()) {
synchronized (SandboxExtensionFinder.class) {
if (extensionDescriptors.isEmpty()) {
ServiceLoader<ScriptingExtensionPluginInfo> loader =
ServiceLoader.load(ScriptingExtensionPluginInfo.class);
loader.iterator().forEachRemaining(extensionDescriptors::add);
}
}
}
return Collections.unmodifiableList(extensionDescriptors);
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.api.metadata;
/**
* Where supported, the following named fields are injected into object which
* implement this interface:
* <UL>
* <LI>SCENARIO_NAME - The full scenario name, used for logging, metrics, etc</LI>
* <LI>STARTED_AT_MILLIS - The millisecond timestamp used to create the scenario name</LI>
* <LI>SYSTEM_ID - A stable identifier based on the available ip addresses</LI></LK>
* <LI>SYSTEM_FINGERPRINT - a stable and pseudonymous identifier based on SYSTEM_ID</LI>
* </UL>
*/
public interface ScenarioMetadataAware {
void setScenarioMetadata(ScenarioMetadata metadata);
static void apply(Object target, ScenarioMetadata metadata) {
if (target instanceof ScenarioMetadataAware) {
((ScenarioMetadataAware)target).setScenarioMetadata(metadata);
}
}
}

View File

@ -24,7 +24,6 @@ import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import io.nosqlbench.addins.s3.s3urlhandler.S3ClientCache;
import io.nosqlbench.addins.s3.s3urlhandler.S3UrlFields;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.ScenarioMetadataAware;
import io.nosqlbench.api.system.NBEnvironment;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
@ -39,7 +38,7 @@ import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.Map;
public class S3Uploader extends NBBaseComponent implements ScenarioMetadataAware {
public class S3Uploader extends NBBaseComponent {
private ScenarioMetadata scenarioMetadata;
private final static Logger logger = LogManager.getLogger(S3Uploader.class);
@ -81,11 +80,7 @@ public class S3Uploader extends NBBaseComponent implements ScenarioMetadataAware
Map<String,String> combined = new LinkedHashMap<>(params);
combined.putAll(scenarioMetadata.asMap());
String url = NBEnvironment.INSTANCE.interpolateWithTimestamp(
urlTemplate,
scenarioMetadata.getStartedAt(),
combined
)
String url = NBEnvironment.INSTANCE.interpolateWithTimestamp(urlTemplate, scenarioMetadata.getStartedAt(), combined)
.orElseThrow();
logger.debug(() -> "S3 composite URL is '" + url + "'");
@ -103,8 +98,4 @@ public class S3Uploader extends NBBaseComponent implements ScenarioMetadataAware
return url;
}
@Override
public void setScenarioMetadata(ScenarioMetadata metadata) {
this.scenarioMetadata = metadata;
}
}

View File

@ -20,22 +20,23 @@ package io.nosqlbench.components;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.decorators.NBTokenWords;
import io.nosqlbench.components.events.NBEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
public class NBBaseComponent extends NBBaseComponentMetrics implements NBComponent {
public class NBBaseComponent extends NBBaseComponentMetrics implements NBComponent, NBTokenWords {
private final static Logger logger = LogManager.getLogger("RUNTIME");
protected final NBComponent parent;
protected final NBLabels labels;
private final List<NBComponent> children = new ArrayList<>();
public NBBaseComponent(NBComponent parentComponent) {
this(parentComponent,NBLabels.forKV());
this(parentComponent, NBLabels.forKV());
}
public NBBaseComponent(NBComponent parentComponent, NBLabels componentSpecificLabelsOnly) {
this.labels = componentSpecificLabelsOnly;
if (parentComponent != null) {
@ -78,7 +79,7 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
@Override
public NBLabels getLabels() {
NBLabels effectiveLabels = (this.parent == null ? NBLabels.forKV() : parent.getLabels());
effectiveLabels = (this.labels == null ) ? effectiveLabels : effectiveLabels.and(this.labels);
effectiveLabels = (this.labels == null) ? effectiveLabels : effectiveLabels.and(this.labels);
return effectiveLabels;
}
@ -116,8 +117,8 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
}
@Override
public NBBuilders create() {
return new NBBuilders(this);
public NBCreators create() {
return new NBCreators(this);
}
@Override
@ -129,7 +130,7 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName());
if (getComponentMetrics().size()>0) {
if (!getComponentMetrics().isEmpty()) {
sb.append(System.lineSeparator()).append("metrics:");
for (NBMetric componentMetric : getComponentMetrics()) {
sb.append(System.lineSeparator()).append(" ").append(componentMetric.toString());
@ -143,7 +144,9 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
public void onEvent(NBEvent event) {
logger.debug(() -> description() + " handling event " + event.toString());
switch (event) {
case UpEvent ue -> { if (parent!=null) parent.onEvent(ue); }
case UpEvent ue -> {
if (parent != null) parent.onEvent(ue);
}
case DownEvent de -> {
for (NBComponent child : children) {
child.onEvent(de);
@ -152,4 +155,25 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
default -> logger.warn("dropping event " + event);
}
}
@Override
public <T> Optional<T> findParentService(Class<T> type) {
return findServiceOn(type, this);
}
private <T> Optional<T> findServiceOn(Class<T> type, NBComponent target) {
if (type.isAssignableFrom(target.getClass())) {
return Optional.of(type.cast(target));
} else if (getParent() != null) {
return findServiceOn(type, getParent());
} else {
return Optional.empty();
}
}
@Override
public Map<String, String> getTokens() {
return getLabels().asMap();
}
}

View File

@ -19,10 +19,7 @@ package io.nosqlbench.components;
import io.nosqlbench.adapters.api.util.TagFilter;
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -58,6 +55,26 @@ public class NBBaseComponentMetrics implements NBComponentMetrics {
return filter.filterLabeled(metrics.values());
}
@Override
public <T> Collection<? extends T> findComponentMetrics(String pattern, Class<T> type) {
if (this.metrics.containsKey(pattern)) {
NBMetric metric = metrics.get(pattern);
if (type.isAssignableFrom(metric.getClass())) {
return List.of(type.cast(metric));
}
}
TagFilter filter = new TagFilter(pattern);
List<NBMetric> found = filter.filterLabeled(metrics.values());
List<T> foundAndMatching = new ArrayList<>();
for (NBMetric metric : found) {
if (type.isAssignableFrom(metric.getClass())) {
foundAndMatching.add(type.cast(metric));
}
}
return foundAndMatching;
}
@Override
public Collection<? extends NBMetric> getComponentMetrics() {
return metrics.values();

View File

@ -17,6 +17,7 @@
package io.nosqlbench.components;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.decorators.NBProviderSearch;
import java.util.List;
@ -33,7 +34,13 @@ import java.util.List;
*
* This interface includes more aspects of above by extension going forward.
*/
public interface NBComponent extends AutoCloseable, NBLabeledElement, NBComponentMetrics, NBComponentServices, NBComponentEvents {
public interface NBComponent extends
AutoCloseable,
NBLabeledElement,
NBComponentMetrics,
NBComponentServices,
NBComponentEvents,
NBProviderSearch {
NBComponent EMPTY_COMPONENT = new NBBaseComponent(null);

View File

@ -28,6 +28,10 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.function.BiFunction;
/**
* Since we like to use SPI/ServiceLoader, and we can't use the Provider-only interface as
* suggested in {@link ServiceLoader}, we have to indirect to the safe ctor from a parent context.
*/
public class NBComponentLoader {
public static <C extends NBComponent> C load(NBComponent parent, String selector, Class<C> clazz) {
ServiceLoader<C> loader = ServiceLoader.load(clazz);

View File

@ -50,5 +50,15 @@ public interface NBComponentMetrics {
return found.get(0);
}
default <T> T findOneComponentMetric(String pattern, Class<T> type) {
NBMetric found = findOneComponentMetric(pattern);
if (type.isAssignableFrom(found.getClass())) {
return type.cast(found);
}
return null;
}
Collection<? extends NBMetric> getComponentMetrics();
<T> Collection<? extends T> findComponentMetrics(String pattern, Class<T> type);
}

View File

@ -18,7 +18,7 @@ package io.nosqlbench.components;
public interface NBComponentServices {
public NBBuilders create();
public NBCreators create();
public NBFinders find();
}

View File

@ -35,32 +35,27 @@ import io.nosqlbench.api.optimizers.BobyqaOptimizerInstance;
import io.nosqlbench.api.files.FileAccess;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.shutdown.NBShutdownHook;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.BundledExtensions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.codahale.metrics.MetricAttribute;
import com.codahale.metrics.MetricFilter;
import io.nosqlbench.api.csvoutput.CsvOutputPluginWriter;
import io.nosqlbench.api.engine.metrics.reporters.*;
import io.nosqlbench.api.files.FileAccess;
import io.nosqlbench.api.optimizers.BobyqaOptimizerInstance;
import org.apache.logging.log4j.Marker;
import java.io.PrintStream;
import java.util.Set;
import java.util.*;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.DoubleSummaryStatistics;
import java.util.List;
import java.util.function.Supplier;
public class NBBuilders {
public class NBCreators {
// TODO: add mandatory sanitize() around all label names and label "name" values
private final Logger logger = LogManager.getLogger(NBBuilders.class);
private final Logger logger = LogManager.getLogger(NBCreators.class);
private final NBBaseComponent base;
public NBBuilders(NBBaseComponent base) {
public NBCreators(NBBaseComponent base) {
this.base = base;
}
@ -398,4 +393,11 @@ public class NBBuilders {
}
}
public <T> T requiredExtension(String name, Class<T> type) {
return new BundledExtensions(base)
.load(name, type)
.orElseThrow(
() -> new RuntimeException("unable to load extension with name '" + name + "' and type '" + type.getSimpleName() + "'")
);
}
}

View File

@ -17,8 +17,10 @@
package io.nosqlbench.components;
import io.nosqlbench.api.engine.metrics.instruments.*;
import org.apache.commons.text.diff.StringsComparator;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@ -45,14 +47,16 @@ public class NBFinders {
System.out.println(NBComponentFormats.formatAsTree(base));
throw new RuntimeException("unable to find metric with pattern '" + pattern + "'");
}
if (clazz.isAssignableFrom(found.getClass())) {
if (clazz.isInstance(found.getClass())) {
return clazz.cast(found);
} else if (clazz.isAssignableFrom(found.getClass())) {
return clazz.cast(found);
} else {
throw new RuntimeException(
"found metric with pattern '" + pattern + "'" +
", but it was type "
+ found.getClass().getSimpleName() + " (not a "
+ clazz.getSimpleName() +")"
+ clazz.getSimpleName() + ")"
);
}
}
@ -123,6 +127,20 @@ public class NBFinders {
return found;
}
private <T> List<T> metricsInTree(String pattern, Class<T> type) {
if (pattern.isEmpty()) {
throw new RuntimeException("non-empty predicate is required for this form. Perhaps you wanted metricsInTree()");
}
Iterator<NBComponent> tree = NBComponentTraversal.traverseBreadth(base);
List<T> found = new ArrayList<>();
while (tree.hasNext()) {
NBComponent c = tree.next();
found.addAll(c.findComponentMetrics(pattern,type));
}
return found;
}
private NBMetric oneMetricInTree(String pattern) {
List<NBMetric> found = metricsInTree(pattern);
if (found.size() != 1) {
@ -132,4 +150,13 @@ public class NBFinders {
return found.get(0);
}
public <T extends NBMetric> T topMetric(String pattern, Class<T> type) {
List<T> metrics = this.metricsInTree(pattern, type);
if (!metrics.isEmpty()) {
return metrics.get(0);
} else {
throw new RuntimeException("top metric not found: pattern='" +pattern +"', type='" + type.getSimpleName() + "'");
}
}
}

View File

@ -14,12 +14,10 @@
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.execution;
package io.nosqlbench.components.decorators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Optional;
public class Extensions extends ConcurrentHashMap<String,String> {
public static Extensions ofNone() {
return new Extensions();
}
public interface NBProviderSearch {
public <T> Optional<T> findParentService(Class<T> type);
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components.decorators;
import java.util.Map;
public interface NBTokenWords {
Map<String,String> getTokens();
}

View File

@ -27,7 +27,7 @@ import java.util.List;
public class DiagReader extends Reader {
Reader wrapped;
private final String prefix;
CharArrayWriter buffer = new CharArrayWriter(0);
final CharArrayWriter buffer;
private final List<String> timedLog = new ArrayList<String>();
private final DateTimeFormatter tsformat = DateTimeFormatter.ISO_DATE_TIME;
@ -37,17 +37,23 @@ public class DiagReader extends Reader {
public DiagReader(Reader wrapped, String prefix) {
this.wrapped = wrapped;
this.prefix = prefix;
this.buffer = new CharArrayWriter(0);
}
public DiagReader(Reader wrapped) {
this.wrapped = wrapped;
this.prefix = null;
this.buffer = null;
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
String tsprefix = LocalDateTime.now().format(tsformat);
int read = wrapped.read(cbuf, off, len);
buffer.write(cbuf, off, len);
timedLog.add(tsprefix + prefix + new String(cbuf, off, len));
if (buffer!=null) {
String tsprefix = LocalDateTime.now().format(tsformat);
buffer.write(cbuf, off, len);
timedLog.add(tsprefix + prefix + new String(cbuf, off, len));
}
return read;
}

View File

@ -36,13 +36,20 @@ import java.util.regex.Pattern;
public class DiagWriter extends PrintWriter {
private final DateTimeFormatter tsformat = DateTimeFormatter.ISO_DATE_TIME;
Writer wrapped;
InterjectingCharArrayWriter buffer;
public DiagWriter(Writer... writers) {
super(new FanWriter(writers));
this.wrapped = wrapped;
this.buffer = buffer;
public DiagWriter(InterjectingCharArrayWriter charbuffer, Writer writer) {
super(new FanWriter(charbuffer, writer));
this.buffer = charbuffer;
}
public DiagWriter(InterjectingCharArrayWriter charbuffer) {
super(charbuffer);
this.buffer = charbuffer;
}
public DiagWriter(PrintWriter out) {
super(out);
}
public String getBuf() {

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.api.extensions.ScriptingExtensionPluginInfo;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.Stream;
public class BundledExtensions {
private final static Logger logger = LogManager.getLogger(BundledExtensions.class);
private final NBComponent parent;
public BundledExtensions(NBComponent parent) {
this.parent = parent;
}
public static ScriptingExtensionPluginInfo<?>[] findAll() {
return ServiceLoader.load(ScriptingExtensionPluginInfo.class).stream()
.map(l -> l.get()).toArray(ScriptingExtensionPluginInfo[]::new);
}
public <T> Optional<T> load(String name, Class<T> type) {
ServiceLoader<ScriptingExtensionPluginInfo> loader = ServiceLoader.load(ScriptingExtensionPluginInfo.class);
return (Optional<T>) loader
.stream()
.filter(p -> {
return Arrays.stream(p.type().getAnnotationsByType(Service.class)).toList().get(0).selector().equals(name);
})
.map(p -> p.get().getExtensionObject(logger, parent))
.findAny()
.filter(raw -> type.isAssignableFrom(raw.getClass()))
.stream().findAny();
}
}

View File

@ -26,6 +26,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.regex.Pattern;
import static org.assertj.core.api.Assertions.assertThat;
@ -42,153 +43,164 @@ public class DirectRuntimeScenarioTests {
ScenariosResults results = executor.awaitAllResults();
System.out.println(results);
}
@Disabled("enable before merge")
@Test
public void testSC_activit_init_error() {
SC_start_stop_diag scenario = new SC_start_stop_diag(testC, "SC_start_stop_diag");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("SC_start_stop_diag"));
public void test_SC_linkedinput() {
NBScenario scenario = new SC_linkedinput(testC,"test_SC_linkedinput");
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
Pattern p = Pattern.compile(".*started leader.*started follower.*stopped leader.*stopped follower.*", Pattern.DOTALL);
assertThat(p.matcher(result.getIOLog()).matches()).isTrue();
}
@Test
public void testSC_activity_init_error() {
SC_activity_init_error scenario = new SC_activity_init_error(testC, "SC_activity_init_error");
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
assertThat(result.getException()).isNotNull();
assertThat(result.getException().getMessage()).contains("Unknown config parameter 'unknown_config'");
assertThat(result.getException()).isNotNull();
}
@Test
@Disabled("enable before merge")
public void test_SC_activity_error() {
NBScenario scenario = new SC_activity_error(testC,"test_SC_activity_error");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_activity_error"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
assertThat(result.getException()).isNotNull();
assertThat(result.getException().getMessage()).contains("For input string: \"unparsable\"");
}
@Disabled("enable before merge")
@Test
public void test_SC_activity_init_error() {
NBScenario scenario = new SC_activity_init_error(testC,"test_SC_activity_init_error");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_activity_init_error"));
}
@Disabled("enable before merge")
@Test
public void test_SC_await_finished() {
NBScenario scenario = new SC_await_finished(testC,"test_SC_await_finished");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_await_finished"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
assertThat(result.getIOLog()).contains("awaited activity");
}
@Disabled("enable before merge")
@Test
public void test_SC_basicdiag() {
NBScenario scenario = new SC_basicdiag(testC,"test_SC_basicdiag");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_basicdiag"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
assertThat(result.getIOLog().indexOf("starting activity basic_diag")).isGreaterThanOrEqualTo(0);
assertThat(result.getIOLog().indexOf("stopping activity basic_diag")).isGreaterThanOrEqualTo(1);
assertThat(result.getIOLog().indexOf("stopped activity basic_diag")).isGreaterThanOrEqualTo(2);
}
@Disabled("enable before merge")
@Test
public void test_SC_blockingrun() {
NBScenario scenario = new SC_blockingrun(testC,"test_SC_blockingrun");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_blockingrun"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
assertThat(result.getIOLog()).matches(Pattern.compile(".*running.*finished.*running.*finished.*",Pattern.DOTALL));
}
@Disabled("enable before merge")
@Test
public void test_SC_cocycledelay_bursty() {
NBScenario scenario = new SC_cocycledelay_bursty(testC,"test_SC_cocycledelay_bursty");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_cocycledelay_bursty"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
result.report();
}
@Disabled("enable before merge")
@Test
public void test_SC_cocycledelay_strict() {
NBScenario scenario = new SC_cocycledelay_strict(testC,"test_SC_cocycledelay_strict");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_cocycledelay_strict"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_cycle_rate() {
NBScenario scenario = new SC_cycle_rate(testC,"test_SC_cycle_rate");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_cycle_rate"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_cycle_rate_change() {
NBScenario scenario = new SC_cycle_rate_change(testC,"test_SC_cycle_rate_change");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_cycle_rate_change"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_extension_csvmetrics() {
NBScenario scenario = new SC_extension_csvmetrics(testC,"test_SC_extension_csvmetrics");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_extension_csvmetrics"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_extension_csvoutput() {
NBScenario scenario = new SC_extension_csvoutput(testC,"test_SC_extension_csvoutput");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_extension_csvoutput"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_extension_histostatslogger() {
NBScenario scenario = new SC_extension_histostatslogger(testC,"test_SC_extension_histostatslogger");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_extension_histostatslogger"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_extension_shutdown_hook() {
NBScenario scenario = new SC_extension_shutdown_hook(testC,"test_SC_extension_shutdown_hook");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_extension_shutdown_hook"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Test
public void test_SC_extension_example() {
NBScenario scenario = new SC_extension_example(testC,"test_SC_extension_example");
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
// result.exitWithCode();
assertThat(result.getIOLog()).contains("3+5=8");
}
@Disabled("enable before merge")
@Test
public void test_SC_histologger() {
NBScenario scenario = new SC_histologger(testC,"test_SC_histologger");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_histologger"));
}
@Disabled("enable before merge")
@Test
public void test_SC_linkedinput() {
NBScenario scenario = new SC_linkedinput(testC,"test_SC_linkedinput");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_linkedinput"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_optimo() {
NBScenario scenario = new SC_optimo_test(testC,"test_SC_optimo");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_optimo"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
System.out.println(result);
}
@Disabled("enable before merge")
@Test
public void test_SC_params_variable() {
NBScenario scenario = new SC_params_variable(testC,"test_SC_params_variable");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_params_variable"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_readmetrics() {
NBScenario scenario = new SC_readmetrics(testC,"test_SC_readmetrics");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_readmetrics"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_speedcheck() {
NBScenario scenario = new SC_speedcheck(testC,"test_SC_speedcheck");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_speedcheck"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_start_stop_diag() {
NBScenario scenario = new SC_start_stop_diag(testC,"test_SC_start_stop_diag");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_start_stop_diag"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_threadchange() {
NBScenario scenario = new SC_threadchange(testC,"test_SC_threadchange");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_threadchange"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_threadspeeds() {
NBScenario scenario = new SC_threadspeeds(testC,"test_SC_threadspeeds");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_threadspeeds"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
}
@Disabled("enable before merge")
@Test
public void test_SC_undef_param() {
NBScenario scenario = new SC_undef_param(testC, "test_SC_undef_param");
ScenarioResult result = scenario.apply(NBSceneBuffer.init("test_SC_undef_param"));
ScenarioResult result = scenario.apply(NBSceneBuffer.traced(scenario));
String out = result.getIOLog();
assertThat(out).contains("foobar");
}

View File

@ -44,5 +44,8 @@ public class SC_basicdiag extends SCBaseScenario {
);
stdout.println("starting activity basic_diag");
controller.start(basic_diag);
stdout.println("stopping activity basic_diag");
controller.stop(basic_diag);
stdout.println("stopped activity basic_diag");
}
}

View File

@ -57,13 +57,13 @@ public class SC_blockingrun extends SCBaseScenario {
public void invoke() {
var activitydef1 = Map.of(
"alias","blockactivity1","driver","diag",
"cycles","0..100000","threads","1",
"cycles","0..10000","threads","1",
"interval","2000","op","noop"
);
var activitydef2 = Map.of(
"alias", "blockingactivity2","driver","diag",
"cycles","0..100000","threads","1",
"cycles","0..10000","threads","1",
"interval","2000", "op","noop"
);

View File

@ -18,7 +18,11 @@ package io.nosqlbench.nbr.examples.injava;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.events.ParamChange;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
import java.util.Map;
@ -81,68 +85,60 @@ public class SC_cocycledelay_bursty extends SCBaseScenario {
*/
@Override
public void invoke() {
int diagrate = 500;
var co_cycle_delay_bursty = Map.of(
"alias", "co_cycle_delay_bursty",
"driver", "diag",
"cycles", "0..1000000",
"threads", "1",
"cyclerate", "10,1.5",
"op", "log: level=info"
"cyclerate", "1000,1.5",
"op", "diagrate: diagrate=" + diagrate
// "dryrun", "op" // silent
);
controller.waitMillis(1000);
controller.waitMillis(500);
stdout.println("starting activity co_cycle_delay_bursty");
controller.start(co_cycle_delay_bursty);
Activity activity = controller.start(co_cycle_delay_bursty);
controller.waitMillis(1000);
NBMetricTimer service_time_counter = find().topMetric("activity=co_cycle_delay_bursty,name=cycles_servicetime", NBMetricTimer.class);
NBMetricGauge wait_time_gauge = find().topMetric("activity=co_cycle_delay_bursty,name=cycles_waittime",NBMetricGauge.class);
for (int i = 0; i < 5; i++) {
controller.waitMillis(1000);
if (!controller.isRunningActivity("co_cycle_delay_bursty")) {
throw new RuntimeException("Scenario exited prematurely.");
}
stdout.println("backlogging, cycles=" + service_time_counter.getCount() +
" waittime=" + wait_time_gauge.getValue() +
" diagrate=" + diagrate +
" cyclerate=" + activity.getCycleLimiter().getSpec()
);
}
stdout.println("step1 waittime=" + wait_time_gauge.getValue());
onEvent(new ParamChange<>(new CycleRateSpec(10000, 1.1)));
for (int i = 0; i < 10; i++) {
if (!controller.isRunningActivity("co_cycle_delay_bursty")) {
throw new RuntimeException("scenario exited prematurely.");
}
stdout.println("backlogging, cycles=" + service_time_counter.getCount() +
" waittime=" + wait_time_gauge.getValue() +
" diagrate=" + diagrate +
" cyclerate=" + activity.getCycleLimiter().getSpec()
);
if (wait_time_gauge.getValue() < 50000000) {
stdout.println("waittime trended back down as expected, exiting on iteration " + i);
break;
}
}
stdout.println("step2 metrics.waittime=" + wait_time_gauge.getValue());
controller.stop(co_cycle_delay_bursty);
stdout.println("stopped activity co_cycle_delay_bursty");
NBMetricCounter service_time_counter = find().counter("activity=co_cycle_delay_bursty,name=cycles_servicetime");
NBMetricGauge wait_time_gauge = find().gauge("activity=co_cycle_delay_bursty,name=cycles_waittime");
String diagrate = controller.getActivityDef("co_cycle_delay_bursty").getParams().get("diagrate").toString();
String cyclerate = controller.getActivityDef("co_cycle_delay_bursty").getParams().get("cyclerate").toString();
//
// for (int i = 0; i < 5; i++) {
// controller.waitMillis(1000);
// if (!controller.isRunningActivity(co_cycle_delay_bursty)) {
// stdout.println("scenario exited prematurely, aborting.");
// break;
// }
// diagrate = controller.getActivityDef("co_cycle_delay_bursty").getParams().get("diagrate").toString();
// cyclerate = controller.getActivityDef("co_cycle_delay_bursty").getParams().get("cyclerate").toString();
// stdout.println(
// "backlogging, cycles=" + service_time_counter.getCount() +
// " waittime=" + wait_time_gauge.getValue() +
// " diagrate=" + diagrate +
// " cyclerate=" + cyclerate
// );
// }
//
// stdout.println("step1 metrics.waittime=" + wait_time_gauge.getValue());
// controller.getActivityDef("co_cycle_delay_bursty").getParams().put("diagrate", "10000");
//
// for (int i = 0; i < 10; i++) {
// if (!controller.isRunningActivity("co_cycle_delay_bursty")) {
// stdout.println("scenario exited prematurely, aborting.");
// break;
// }
// diagrate = controller.getActivityDef("co_cycle_delay_bursty").getParams().get("diagrate").toString();
// cyclerate = controller.getActivityDef("co_cycle_delay_bursty").getParams().get("cyclerate").toString();
//
// stdout.println(
// "recovering, cycles=" + service_time_counter.getCount() +
// " waittime=" + wait_time_gauge.getValue() +
// " diagrate=" + diagrate +
// " cyclerate=" + cyclerate
// );
//
// controller.waitMillis(1000);
// if (wait_time_gauge.getValue() < 50000000) {
// stdout.println("waittime trended back down as expected, exiting on iteration " + i);
// break;
// }
// }
//
// stdout.println("step2 metrics.waittime=" + wait_time_gauge.getValue());
// controller.stop(co_cycle_delay_bursty);
//
// stdout.println("stopped activity co_cycle_delay_bursty");
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nbr.examples.injava;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
import io.nosqlbench.engine.extensions.example.ExamplePlugin;
public class SC_extension_example extends SCBaseScenario {
public SC_extension_example(NBComponent parentComponent, String scenarioName) {
super(parentComponent, scenarioName);
}
/** <pre>{@code
* var csvlogger = csvoutput.open("logs/csvoutputtestfile.csv","header1","header2");
*
* csvlogger.write({"header1": "value1","header2":"value2"});
* }</pre>
*/
@Override
public void invoke() {
ExamplePlugin examplePlugin = create().requiredExtension("example", ExamplePlugin.class);
long sum = examplePlugin.getSum(3, 5);
stdout.println("3+5=" + sum);
}
}

View File

@ -19,12 +19,15 @@ package io.nosqlbench.nbr.examples.injava;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
import java.util.Map;
public class SC_linkedinput extends SCBaseScenario {
public SC_linkedinput(NBComponent parentComponent, String scenarioName) {
super(parentComponent, scenarioName);
}
/** <pre>{@code
/**
* <pre>{@code
* var leader = {
* driver: 'diag',
* alias: 'leader',
@ -55,5 +58,30 @@ public class SC_linkedinput extends SCBaseScenario {
@Override
public void invoke() {
var leader = Map.of(
"driver", "diag",
"alias", "leader",
"targetrate", "10000",
"op", "log:level=info"
);
var follower = Map.of(
"driver", "diag",
"alias", "follower",
"op", "log:level=INFO"
);
controller.start(leader);
stdout.println("started leader");
controller.start(follower);
stdout.println("started follower");
controller.waitMillis(500);
controller.stop(leader);
stdout.println("stopped leader");
controller.stop(follower);
stdout.println("stopped follower");
}
}

View File

@ -55,7 +55,7 @@ public class PerfWindowSampler {
}
double getValue() {
if (windows.size()==0) {
if (windows.size() == 0) {
return Double.NaN;
}
return windows.getLast().value();
@ -64,15 +64,16 @@ public class PerfWindowSampler {
@Override
public String toString() {
StringBuilder sb = new StringBuilder("PERF VALUE=").append(getValue()).append("\n");
sb.append("windows:\n"+windows.getLast().toString());
sb.append("windows:\n" + windows.getLast().toString());
return sb.toString();
}
public void startWindow() {
startWindow(System.currentTimeMillis());
}
public void startWindow(long now) {
if (window!=null) {
if (window != null) {
throw new RuntimeException("cant start window twice in a row. Must close window first");
}
List<ParamSample> samples = criteria.stream().map(c -> ParamSample.init(c).start(now)).toList();
@ -82,12 +83,13 @@ public class PerfWindowSampler {
public void stopWindow() {
stopWindow(System.currentTimeMillis());
}
public void stopWindow(long now) {
for (int i = 0; i < window.size(); i++) {
window.set(i,window.get(i).stop(now));
window.set(i, window.get(i).stop(now));
}
windows.add(window);
window=null;
window = null;
}
public static record Criterion(
@ -96,18 +98,27 @@ public class PerfWindowSampler {
double weight,
Runnable callback,
boolean delta
) { }
) {
}
public static class WindowSamples extends ArrayList<WindowSample> {
}
public static class WindowSamples extends ArrayList<WindowSample> {}
public static class WindowSample extends ArrayList<ParamSample> {
public WindowSample(List<ParamSample> samples) {
super(samples);
}
public double value() {
return stream().mapToDouble(ParamSample::weightedValue).sum();
double product = 1.0;
for (ParamSample sample : this) {
product *= sample.weightedValue();
}
return product;
}
}
public static record ParamSample(Criterion criterion, long startAt, long endAt, double startval, double endval) {
public double weightedValue() {
return rawValue() * criterion().weight;
@ -121,24 +132,26 @@ public class PerfWindowSampler {
}
private double rate() {
return rawValue()/seconds();
return rawValue() / seconds();
}
private double seconds() {
return ((double)(endAt-startAt)) / 1000d;
return ((double) (endAt - startAt)) / 1000d;
}
public static ParamSample init(Criterion criterion) {
return new ParamSample(criterion, 0,0,Double.NaN, Double.NaN);
return new ParamSample(criterion, 0, 0, Double.NaN, Double.NaN);
}
public ParamSample start(long startTime) {
criterion.callback.run();
double v1 = criterion.supplier.getAsDouble();
return new ParamSample(criterion,startTime,0L, v1, Double.NaN);
return new ParamSample(criterion, startTime, 0L, v1, Double.NaN);
}
public ParamSample stop(long stopTime) {
double v2 = criterion.supplier.getAsDouble();
return new ParamSample(criterion,startAt,stopTime,startval, v2);
return new ParamSample(criterion, startAt, stopTime, startval, v2);
}
@Override

View File

@ -60,8 +60,8 @@ class PerfWindowSamplerTest {
assertThat(value).isCloseTo(30.0,Offset.offset(0.001));
pws.startWindow(10000L);
a1.set(42); // 42-10=32
a2.set(42); // 42-1=41; 41+32=73
a1.set(42); // 42-3=39
a2.set(42); // 42-10=32
pws.stopWindow(11000L);
double value2 = pws.getValue();