more merge fixes

This commit is contained in:
Jonathan Shook 2023-09-29 18:58:32 -05:00
parent 02f69d0038
commit c0a3ec2509
14 changed files with 157 additions and 132 deletions

View File

@ -17,18 +17,18 @@
package io.nosqlbench.engine.api.activityapi.core; package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable; import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser; import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -38,7 +38,7 @@ import java.util.function.Supplier;
* Provides the components needed to build and run an activity a runtime. * Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful Activity is to extend {@link SimpleActivity}. * The easiest way to build a useful Activity is to extend {@link SimpleActivity}.
*/ */
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBLabeledElement { public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent {
/** /**
* Provide the activity with the controls needed to stop itself. * Provide the activity with the controls needed to stop itself.

View File

@ -16,14 +16,14 @@
package io.nosqlbench.engine.api.activityapi.core; package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices; import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser; import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -36,7 +36,7 @@ import java.util.Optional;
* an action dispenser. Default implementations of input and motor dispensers are provided, * an action dispenser. Default implementations of input and motor dispensers are provided,
* and by extension, default inputs and motors.</p> * and by extension, default inputs and motors.</p>
*/ */
@Deprecated(forRemoval = true,since = "5.0") //@Deprecated(forRemoval = true,since = "5.0")
public interface ActivityType<A extends Activity> { public interface ActivityType<A extends Activity> {
@ -47,8 +47,8 @@ public interface ActivityType<A extends Activity> {
* @return a distinct Activity instance for each call * @return a distinct Activity instance for each call
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
default A getActivity(final ActivityDef activityDef, final NBLabeledElement parentLabels) { default A getActivity(final ActivityDef activityDef, final NBComponent parent) {
final SimpleActivity activity = new SimpleActivity(activityDef, parentLabels); final SimpleActivity activity = new SimpleActivity(parent,activityDef);
return (A) activity; return (A) activity;
} }
@ -60,8 +60,8 @@ public interface ActivityType<A extends Activity> {
* @param activities a map of existing activities * @param activities a map of existing activities
* @return a distinct activity instance for each call * @return a distinct activity instance for each call
*/ */
default Activity getAssembledActivity(final ActivityDef activityDef, final Map<String, Activity> activities, final NBLabeledElement labels) { default Activity getAssembledActivity(final ActivityDef activityDef, final Map<String, Activity> activities, final NBComponent parent) {
final A activity = this.getActivity(activityDef, labels); final A activity = this.getActivity(activityDef, parent);
final InputDispenser inputDispenser = this.getInputDispenser(activity); final InputDispenser inputDispenser = this.getInputDispenser(activity);
if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities); if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities);

View File

@ -17,7 +17,8 @@
package io.nosqlbench.engine.api.activityimpl; package io.nosqlbench.engine.api.activityimpl;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.nosqlbench.api.labels.NBLabelSpec; import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.api.config.standard.NBBaseComponent;
import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter; import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
@ -29,7 +30,6 @@ import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec; import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser; import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper; import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels; import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
@ -67,9 +67,8 @@ import java.util.stream.Collectors;
/** /**
* A default implementation of an Activity, suitable for building upon. * A default implementation of an Activity, suitable for building upon.
*/ */
public class SimpleActivity implements Activity { public class SimpleActivity extends NBBaseComponent implements Activity {
private static final Logger logger = LogManager.getLogger("ACTIVITY"); private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final NBLabeledElement parentLabels;
protected ActivityDef activityDef; protected ActivityDef activityDef;
private final List<AutoCloseable> closeables = new ArrayList<>(); private final List<AutoCloseable> closeables = new ArrayList<>();
@ -91,18 +90,10 @@ public class SimpleActivity implements Activity {
private ActivityMetricProgressMeter progressMeter; private ActivityMetricProgressMeter progressMeter;
private String workloadSource = "unspecified"; private String workloadSource = "unspecified";
private final RunStateTally tally = new RunStateTally(); private final RunStateTally tally = new RunStateTally();
private final NBLabels labels;
public SimpleActivity(ActivityDef activityDef, NBLabeledElement parentLabels) { public SimpleActivity(NBComponent parent, ActivityDef activityDef) {
NBLabels activityLabels = parentLabels.getLabels() super(parent,NBLabels.forKV("activity",activityDef.getAlias()).and(activityDef.auxLabels()));
.and("activity", activityDef.getAlias());
Optional<String> auxLabelSpec = activityDef.getParams().getOptionalString("labels");
if (auxLabelSpec.isPresent()) {
activityLabels = activityLabels.and(NBLabelSpec.parseLabels(auxLabelSpec.get()));
}
this.labels = activityLabels;
this.activityDef = activityDef; this.activityDef = activityDef;
this.parentLabels = parentLabels;
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) { if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString( Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload", "workload",
@ -119,8 +110,8 @@ public class SimpleActivity implements Activity {
} }
} }
public SimpleActivity(String activityDefString, NBLabeledElement parentLabels) { public SimpleActivity(NBComponent parent, String activityDefString) {
this(ActivityDef.parseActivityDef(activityDefString), parentLabels); this(parent,ActivityDef.parseActivityDef(activityDefString));
} }
@Override @Override
@ -706,8 +697,4 @@ public class SimpleActivity implements Activity {
return tally; return tally;
} }
@Override
public NBLabels getLabels() {
return this.labels;
}
} }

View File

@ -27,14 +27,14 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTem
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp; import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.api.Shutdownable; import io.nosqlbench.api.Shutdownable;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.config.standard.*; import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics; import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge; import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.api.errors.OpConfigError; import io.nosqlbench.api.errors.OpConfigError;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.annotations.ServiceSelector; import io.nosqlbench.nb.annotations.ServiceSelector;
@ -64,8 +64,8 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
private final Gauge<Double> activeOpsGauge; private final Gauge<Double> activeOpsGauge;
private final Gauge<Double> completeOpsGauge; private final Gauge<Double> completeOpsGauge;
public StandardActivity(ActivityDef activityDef, NBLabeledElement parentLabels) { public StandardActivity(NBComponent parent, ActivityDef activityDef) {
super(activityDef, parentLabels); super(parent,activityDef);
OpsDocList workload; OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");

View File

@ -16,16 +16,16 @@
package io.nosqlbench.engine.api.activityimpl.uniform; package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.NBReconfigurable; import io.nosqlbench.api.config.standard.NBReconfigurable;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType; import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -40,26 +40,25 @@ public class StandardActivityType<A extends StandardActivity<?,?>> extends Simpl
private static final Logger logger = LogManager.getLogger("ACTIVITY"); private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final Map<String, DriverAdapter> adapters = new HashMap<>(); private final Map<String, DriverAdapter> adapters = new HashMap<>();
public StandardActivityType(final DriverAdapter<?,?> adapter, final ActivityDef activityDef, final NBLabeledElement parentLabels) { public StandardActivityType(final DriverAdapter<?,?> adapter, final ActivityDef activityDef, final NBComponent parent) {
super(activityDef super(parent,activityDef
.deprecate("type","driver") .deprecate("type","driver")
.deprecate("yaml", "workload"), .deprecate("yaml", "workload")
parentLabels
); );
adapters.put(adapter.getAdapterName(),adapter); adapters.put(adapter.getAdapterName(),adapter);
if (adapter instanceof ActivityDefAware) ((ActivityDefAware) adapter).setActivityDef(activityDef); if (adapter instanceof ActivityDefAware) ((ActivityDefAware) adapter).setActivityDef(activityDef);
} }
public StandardActivityType(final ActivityDef activityDef, final NBLabeledElement parentLabels) { public StandardActivityType(final ActivityDef activityDef, final NBComponent parent) {
super(activityDef, parentLabels); super(parent,activityDef);
} }
@Override @Override
public A getActivity(final ActivityDef activityDef, final NBLabeledElement parentLabels) { public A getActivity(final ActivityDef activityDef, final NBComponent parent) {
if (activityDef.getParams().getOptionalString("async").isPresent()) if (activityDef.getParams().getOptionalString("async").isPresent())
throw new RuntimeException("This driver does not support async mode yet."); throw new RuntimeException("This driver does not support async mode yet.");
return (A) new StandardActivity(activityDef, parentLabels); return (A) new StandardActivity(parent, activityDef);
} }
@Override @Override

View File

@ -16,7 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity; package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType; import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
@ -41,9 +41,9 @@ public class ActivityLoader {
this.scenario = scenario; this.scenario = scenario;
} }
public synchronized Activity loadActivity(ActivityDef activityDef, final NBLabeledElement labels) { public synchronized Activity loadActivity(ActivityDef activityDef, final NBComponent parent) {
activityDef= activityDef.deprecate("yaml","workload").deprecate("type","driver"); activityDef= activityDef.deprecate("yaml","workload").deprecate("type","driver");
final Activity activity = new StandardActivityType(activityDef, labels).getAssembledActivity(activityDef, this.activityMap, labels); final Activity activity = new StandardActivityType(activityDef, parent).getAssembledActivity(activityDef, this.activityMap, parent);
this.activityMap.put(activity.getAlias(),activity); this.activityMap.put(activity.getAlias(),activity);
ActivityLoader.logger.debug("Resolved activity for alias '{}'", activityDef.getAlias()); ActivityLoader.logger.debug("Resolved activity for alias '{}'", activityDef.getAlias());
return activity; return activity;

View File

@ -16,17 +16,17 @@
package io.nosqlbench.engine.core.lifecycle.activity; package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType; import io.nosqlbench.api.config.NBComponent;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.api.system.NBEnvironment;
import io.nosqlbench.api.content.Content; import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO; import io.nosqlbench.api.content.NBIO;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.api.spi.SimpleServiceLoader; import io.nosqlbench.api.spi.SimpleServiceLoader;
import io.nosqlbench.api.system.NBEnvironment;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -118,7 +118,7 @@ public class ActivityTypeLoader {
return urlsToAdd; return urlsToAdd;
} }
public Optional<ActivityType> load(final ActivityDef activityDef, final NBLabeledElement labels) { public Optional<ActivityType> load(final ActivityDef activityDef, final NBComponent parent) {
String driverName = activityDef.getParams() String driverName = activityDef.getParams()
.getOptionalString("driver", "type") .getOptionalString("driver", "type")
@ -135,18 +135,18 @@ public class ActivityTypeLoader {
}) })
.ifPresent(this::extendClassLoader); .ifPresent(this::extendClassLoader);
return getDriverAdapter(driverName,activityDef,labels) return getDriverAdapter(driverName,activityDef,parent)
.or(() -> this.ACTIVITYTYPE_SPI_FINDER.getOptionally(driverName)); .or(() -> this.ACTIVITYTYPE_SPI_FINDER.getOptionally(driverName));
} }
private Optional<ActivityType> getDriverAdapter(final String activityTypeName, final ActivityDef activityDef, final NBLabeledElement labels) { private Optional<ActivityType> getDriverAdapter(final String activityTypeName, final ActivityDef activityDef, final NBComponent parent) {
final Optional<DriverAdapter> oda = this.DRIVERADAPTER_SPI_FINDER.getOptionally(activityTypeName); final Optional<DriverAdapter> oda = this.DRIVERADAPTER_SPI_FINDER.getOptionally(activityTypeName);
if (oda.isPresent()) { if (oda.isPresent()) {
final DriverAdapter<?, ?> driverAdapter = oda.get(); final DriverAdapter<?, ?> driverAdapter = oda.get();
final ActivityType activityType = new StandardActivityType<>(driverAdapter, activityDef, labels); final ActivityType activityType = new StandardActivityType<>(driverAdapter, activityDef, parent);
return Optional.of(activityType); return Optional.of(activityType);
} }
return Optional.empty(); return Optional.empty();

View File

@ -74,7 +74,7 @@ public class ScenarioController implements NBLabeledElement {
private synchronized ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) { private synchronized ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) { if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, this); Activity activity = this.activityLoader.loadActivity(activityDef, scenario);
ActivityExecutor executor = new ActivityExecutor(activity, this.scenario.getScenarioName()); ActivityExecutor executor = new ActivityExecutor(activity, this.scenario.getScenarioName());
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor); Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor); ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);

View File

@ -16,20 +16,20 @@
package io.nosqlbench.engine.core.lifecycle.scenario.script; package io.nosqlbench.engine.core.lifecycle.scenario.script;
import com.codahale.metrics.*; import com.codahale.metrics.*;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics; import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader; import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings; import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.*; import java.util.*;
import java.util.Timer;
import java.util.Map.Entry;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -68,11 +68,11 @@ public enum MetricsMapper {
final ActivityDef activityDef = ActivityDef.parseActivityDef(activitySpec); final ActivityDef activityDef = ActivityDef.parseActivityDef(activitySpec);
MetricsMapper.logger.info(() -> "introspecting metric names for " + activitySpec); MetricsMapper.logger.info(() -> "introspecting metric names for " + activitySpec);
final Optional<ActivityType> activityType = new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY); final Optional<ActivityType> activityType = new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
if (!activityType.isPresent()) if (!activityType.isPresent())
throw new RuntimeException("Activity type '" + activityDef.getActivityType() + "' does not exist in this runtime."); throw new RuntimeException("Activity type '" + activityDef.getActivityType() + "' does not exist in this runtime.");
final Activity activity = activityType.get().getAssembledActivity(activityDef, new HashMap<>(), NBLabeledElement.EMPTY); final Activity activity = activityType.get().getAssembledActivity(activityDef, new HashMap<>(), TestComponent.INSTANCE);
final PolyglotMetricRegistryBindings nashornMetricRegistryBindings = new PolyglotMetricRegistryBindings(ActivityMetrics.getMetricRegistry()); final PolyglotMetricRegistryBindings nashornMetricRegistryBindings = new PolyglotMetricRegistryBindings(ActivityMetrics.getMetricRegistry());
activity.initActivity(); activity.initActivity();
activity.getInputDispenserDelegate().getInput(0); activity.getInputDispenserDelegate().getInput(0);

View File

@ -16,14 +16,14 @@
package io.nosqlbench.engine.core.metadata; package io.nosqlbench.engine.core.metadata;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.api.content.Content; import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO; import io.nosqlbench.api.content.NBIO;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -64,7 +64,7 @@ public class MarkdownFinder {
} }
public String forActivityInstance(final String s) { public String forActivityInstance(final String s) {
final ActivityType activityType = new ActivityTypeLoader().load(ActivityDef.parseActivityDef("driver="+s), NBLabeledElement.EMPTY).orElseThrow( final ActivityType activityType = new ActivityTypeLoader().load(ActivityDef.parseActivityDef("driver="+s), TestComponent.INSTANCE).orElseThrow(
() -> new BasicError("Unable to find driver for '" + s + '\'') () -> new BasicError("Unable to find driver for '" + s + '\'')
); );
return this.forResourceMarkdown(activityType.getClass().getAnnotation(Service.class) return this.forResourceMarkdown(activityType.getClass().getAnnotation(Service.class)

View File

@ -18,16 +18,16 @@ package io.nosqlbench.engine.core;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices; import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser; import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.input.AtomicInput;
import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser; import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult; import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor; import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor;
@ -36,9 +36,22 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assertions.fail;
@ -83,49 +96,49 @@ class ActivityExecutorTest {
// //
// } // }
// @Test @Test
// synchronized void testDelayedStartSanity() { synchronized void testDelayedStartSanity() {
//
// ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
// new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY); new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
//
// Activity activity = new DelayedInitActivity(activityDef); Activity activity = new DelayedInitActivity(activityDef);
// final InputDispenser inputDispenser = new CoreInputDispenser(activity); final InputDispenser inputDispenser = new CoreInputDispenser(activity);
// final ActionDispenser actionDispenser = new CoreActionDispenser(activity); final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
// final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null); final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
//
// MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
// activity.setActionDispenserDelegate(actionDispenser); activity.setActionDispenserDelegate(actionDispenser);
// activity.setOutputDispenserDelegate(outputDispenser); activity.setOutputDispenserDelegate(outputDispenser);
// activity.setInputDispenserDelegate(inputDispenser); activity.setInputDispenserDelegate(inputDispenser);
// activity.setMotorDispenserDelegate(motorDispenser); activity.setMotorDispenserDelegate(motorDispenser);
//
// ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start"); ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
//
// ExecutorService testExecutor = Executors.newCachedThreadPool(); ExecutorService testExecutor = Executors.newCachedThreadPool();
// Future<ExecutionResult> future = testExecutor.submit(activityExecutor); Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
//
//
// try { try {
// activityDef.setThreads(1); activityDef.setThreads(1);
// activityExecutor.startActivity(); activityExecutor.startActivity();
// future.get(); future.get();
// testExecutor.shutdownNow(); testExecutor.shutdownNow();
//
// } catch (final Exception e) { } catch (final Exception e) {
// fail("Unexpected exception", e); fail("Unexpected exception", e);
// } }
//
// assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull(); assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
// } }
@Test @Test
synchronized void testNewActivityExecutor() { synchronized void testNewActivityExecutor() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;"); final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef,NBLabeledElement.EMPTY); new ActivityTypeLoader().load(activityDef,TestComponent.INSTANCE);
Activity simpleActivity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of())); Activity simpleActivity = new SimpleActivity(TestComponent.INSTANCE,activityDef);
// this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef)); // this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef));
@ -171,6 +184,18 @@ class ActivityExecutorTest {
} }
} }
private MotorDispenser<?> getActivityMotorFactory(final Action lc, Input ls) {
return new MotorDispenser<>() {
@Override
public Motor getMotor(final ActivityDef activityDef, final int slotId) {
final Activity activity = new SimpleActivity(TestComponent.INSTANCE,activityDef);
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc);
return cm;
}
};
}
private SyncAction motorActionDelay(long delay) { private SyncAction motorActionDelay(long delay) {
return new SyncAction() { return new SyncAction() {
@Override @Override
@ -190,7 +215,7 @@ class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class); private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class);
public DelayedInitActivity(final ActivityDef activityDef) { public DelayedInitActivity(final ActivityDef activityDef) {
super(activityDef, NBLabeledElement.EMPTY); super(TestComponent.INSTANCE,activityDef);
} }
@Override @Override

View File

@ -16,7 +16,7 @@
package io.nosqlbench.engine.core; package io.nosqlbench.engine.core;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Action; import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.Activity;
@ -27,7 +27,6 @@ import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput; import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -39,9 +38,9 @@ public class CoreMotorTest {
@Test @Test
public void testBasicActivityMotor() { public void testBasicActivityMotor() {
final Activity activity = new SimpleActivity( final Activity activity = new SimpleActivity(
ActivityDef.parseActivityDef("alias=foo"), new TestComponent("testing", "coremotor"),
NBLabeledElement.forMap(Map.of("testing","coremotor")) ActivityDef.parseActivityDef("alias=foo")
); );
final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm = new CoreMotor(activity, 5L, lockstepper); final Motor cm = new CoreMotor(activity, 5L, lockstepper);
final AtomicLong observableAction = new AtomicLong(-3L); final AtomicLong observableAction = new AtomicLong(-3L);
@ -51,18 +50,19 @@ public class CoreMotorTest {
t.start(); t.start();
try { try {
Thread.sleep(1000); // allow action time to be waiting in monitor for test fixture Thread.sleep(1000); // allow action time to be waiting in monitor for test fixture
} catch (final InterruptedException ignored) {} } catch (final InterruptedException ignored) {
}
lockstepper.publishSegment(5L); lockstepper.publishSegment(5L);
final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(),observableAction,5000,100); final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100);
assertThat(observableAction.get()).isEqualTo(5L); assertThat(observableAction.get()).isEqualTo(5L);
} }
@Test @Test
public void testIteratorStride() { public void testIteratorStride() {
SimpleActivity activity = new SimpleActivity("stride=3", NBLabeledElement.EMPTY); SimpleActivity activity = new SimpleActivity(TestComponent.INSTANCE, "stride=3");
final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm1 = new CoreMotor(activity,1L, lockstepper); final Motor cm1 = new CoreMotor(activity, 1L, lockstepper);
final AtomicLongArray ary = new AtomicLongArray(10); final AtomicLongArray ary = new AtomicLongArray(10);
final Action a1 = this.getTestArrayConsumer(ary); final Action a1 = this.getTestArrayConsumer(ary);
cm1.setAction(a1); cm1.setAction(a1);
@ -72,11 +72,12 @@ public class CoreMotorTest {
t1.start(); t1.start();
try { try {
Thread.sleep(500); // allow action time to be waiting in monitor for test fixture Thread.sleep(500); // allow action time to be waiting in monitor for test fixture
} catch (final InterruptedException ignored) {} } catch (final InterruptedException ignored) {
}
lockstepper.publishSegment(11L,12L,13L); lockstepper.publishSegment(11L, 12L, 13L);
final boolean result = this.awaitAryCondition(ala -> 13L == ala.get(2),ary,5000,100); final boolean result = this.awaitAryCondition(ala -> 13L == ala.get(2), ary, 5000, 100);
assertThat(ary.get(0)).isEqualTo(11L); assertThat(ary.get(0)).isEqualTo(11L);
assertThat(ary.get(1)).isEqualTo(12L); assertThat(ary.get(1)).isEqualTo(12L);
assertThat(ary.get(2)).isEqualTo(13L); assertThat(ary.get(2)).isEqualTo(13L);
@ -87,6 +88,7 @@ public class CoreMotorTest {
private SyncAction getTestArrayConsumer(AtomicLongArray ary) { private SyncAction getTestArrayConsumer(AtomicLongArray ary) {
return new SyncAction() { return new SyncAction() {
private int offset; private int offset;
@Override @Override
public int runCycle(final long cycle) { public int runCycle(final long cycle) {
ary.set(this.offset, cycle); ary.set(this.offset, cycle);
@ -95,6 +97,7 @@ public class CoreMotorTest {
} }
}; };
} }
private SyncAction getTestConsumer(AtomicLong atomicLong) { private SyncAction getTestConsumer(AtomicLong atomicLong) {
return new SyncAction() { return new SyncAction() {
@Override @Override
@ -108,7 +111,7 @@ public class CoreMotorTest {
private boolean awaitAryCondition(final Predicate<AtomicLongArray> atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) { private boolean awaitAryCondition(final Predicate<AtomicLongArray> atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) {
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
long now=start; long now = start;
while (now < (start + millis)) { while (now < (start + millis)) {
final boolean result = atomicLongAryPredicate.test(ary); final boolean result = atomicLongAryPredicate.test(ary);
if (result) return true; if (result) return true;
@ -123,7 +126,7 @@ public class CoreMotorTest {
private boolean awaitCondition(final Predicate<AtomicLong> atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) { private boolean awaitCondition(final Predicate<AtomicLong> atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) {
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
long now=start; long now = start;
while (now < (start + millis)) { while (now < (start + millis)) {
final boolean result = atomicPredicate.test(atomicInteger); final boolean result = atomicPredicate.test(atomicInteger);
if (result) return true; if (result) return true;

View File

@ -21,6 +21,7 @@ import io.nosqlbench.api.labels.NBLabels;
public class TestComponent implements NBComponent { public class TestComponent implements NBComponent {
public static final NBComponent INSTANCE = new TestComponent();
private final NBLabels labels; private final NBLabels labels;
public TestComponent(String... labels) { public TestComponent(String... labels) {

View File

@ -18,6 +18,8 @@ package io.nosqlbench.api.engine.activityimpl;
import io.nosqlbench.api.config.NBNamedElement; import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.api.labels.NBLabelSpec;
import io.nosqlbench.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -220,4 +222,12 @@ public class ActivityDef implements NBNamedElement {
return this; return this;
} }
public NBLabels auxLabels() {
Optional<String> auxLabelSpec = getParams().getOptionalString("labels");
if (auxLabelSpec.isPresent()) {
return NBLabelSpec.parseLabels(auxLabelSpec.get());
}
return NBLabels.forKV();
}
} }