mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
enable composable diag tasks as sequence of commands
This commit is contained in:
parent
6dd1c43db3
commit
5b8ed36f49
@ -21,34 +21,79 @@ import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.params.NBParams;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.NBReconfigurable;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Service(value=DriverAdapter.class,selector="diag")
|
||||
public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp,DiagSpace> {
|
||||
@Service(value = DriverAdapter.class, selector = "diag")
|
||||
public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(DiagDriverAdapter.class);
|
||||
private DiagOpMapper mapper;
|
||||
|
||||
public DiagDriverAdapter() {
|
||||
logger.debug("starting up");
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpMapper<DiagOp> getOpMapper() {
|
||||
return new DiagOpMapper();
|
||||
public synchronized OpMapper<DiagOp> getOpMapper() {
|
||||
if (this.mapper == null) {
|
||||
this.mapper = new DiagOpMapper(getSpaceCache());
|
||||
}
|
||||
return this.mapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Function<String, ? extends DiagSpace> getSpaceInitializer(NBConfiguration cfg) {
|
||||
return (String name) -> new DiagSpace(cfg);
|
||||
return (String name) -> new DiagSpace(name, cfg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return super.getConfigModel().add(DiagSpace.getConfigModel());
|
||||
NBConfigModel model = super.getConfigModel();
|
||||
model.add(DiagSpace.getConfigModel());
|
||||
return model;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getReconfigModel() {
|
||||
NBConfigModel model = super.getReconfigModel();
|
||||
NBConfigModel mapperModel = NBReconfigurable.collectModels(DiagDriverAdapter.class, List.of(mapper));
|
||||
return model.add(mapperModel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Function<String, Optional<Map<String, Object>>>> getOpStmtRemappers() {
|
||||
return List.of(
|
||||
stmt -> {
|
||||
if (stmt.matches("^\\w+$")) {
|
||||
return Optional.of(new LinkedHashMap<String, Object>(Map.of("type",stmt)));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
},
|
||||
stmt -> Optional.of(NBParams.one(stmt).getMap())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void applyConfig(NBConfiguration cfg) {
|
||||
super.applyConfig(cfg);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void applyReconfig(NBConfiguration cfg) {
|
||||
super.applyReconfig(cfg);
|
||||
NBReconfigurable.applyMatching(cfg,List.of(mapper));
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.diag;
|
||||
|
||||
import io.nosqlbench.adapter.diag.optasks.DiagOpTask;
|
||||
import io.nosqlbench.adapter.diag.optasks.DiagTask;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -27,16 +27,16 @@ import java.util.Map;
|
||||
public class DiagOp implements CycleOp<Integer> {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(DiagOp.class);
|
||||
private final List<DiagOpTask> mutators;
|
||||
private final List<DiagTask> mutators;
|
||||
|
||||
public DiagOp(List<DiagOpTask> mutators) {
|
||||
public DiagOp(List<DiagTask> mutators) {
|
||||
this.mutators = mutators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer apply(long value) {
|
||||
Map<String, Object> state = Map.of("cycle", value, "code", 0);
|
||||
for (DiagOpTask mutator : mutators) {
|
||||
for (DiagTask mutator : mutators) {
|
||||
state = mutator.apply(value,state);
|
||||
}
|
||||
return (int) state.getOrDefault("code", 0);
|
||||
|
@ -16,67 +16,109 @@
|
||||
|
||||
package io.nosqlbench.adapter.diag;
|
||||
|
||||
import io.nosqlbench.adapter.diag.optasks.DiagOpTask;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.adapter.diag.optasks.DiagTask;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.params.NBParams;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.NBReconfigurable;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class DiagOpDispenser extends BaseOpDispenser<DiagOp> {
|
||||
public class DiagOpDispenser extends BaseOpDispenser<DiagOp> implements NBReconfigurable {
|
||||
private final static Logger logger = LogManager.getLogger(DiagOpDispenser.class);
|
||||
private final LongFunction<DiagOp> opFunc;
|
||||
private OpFunc opFunc;
|
||||
|
||||
private RateLimiter diagRateLimiter;
|
||||
private LongFunction<DiagSpace> spaceF;
|
||||
private OpFunc opFuncs;
|
||||
|
||||
public DiagOpDispenser(ParsedOp op) {
|
||||
super(op);
|
||||
this.opFunc = resolveOpFunc(op);
|
||||
}
|
||||
|
||||
private LongFunction<DiagOp> resolveOpFunc(ParsedOp op) {
|
||||
List<DiagOpTask> tasks = new ArrayList<>();
|
||||
private OpFunc resolveOpFunc(ParsedOp op) {
|
||||
List<DiagTask> tasks = new ArrayList<>();
|
||||
Set<String> tasknames = op.getDefinedNames();
|
||||
|
||||
/**
|
||||
* Dynamically load diag tasks and add them to the in-memory template used by the op dispenser
|
||||
*/
|
||||
for (String taskname : tasknames) {
|
||||
|
||||
// Get the value of the keyed task name, but throw an error if it is not static or not a map
|
||||
Object taskcfg = op.getStaticValue(taskname, Object.class);
|
||||
// Load this value into a map using the adaptive loading logic of NBParams
|
||||
// This can be a map or a string or a list.
|
||||
// Exactly one instance is required, and we take the field values from it as a map
|
||||
Map<String, Object> cfgmap = NBParams.one(taskcfg).getMap();
|
||||
|
||||
// Get the op config by name, if provided in string or map form, and
|
||||
// produce a normalized map form which contains the type field. If
|
||||
// the type isn't contained in the parsed form, inject the name as short-hand
|
||||
// Also, inject the name into the map
|
||||
Map<String,Object> taskcfg = op.parseStaticCmdMap(taskname, "type");
|
||||
taskcfg.computeIfAbsent("name",l -> taskname);
|
||||
taskcfg.computeIfAbsent("type",l -> taskname);
|
||||
String optype = taskcfg.remove("type").toString();
|
||||
|
||||
// Dynamically load the named task instance, based on the op field key AKA the taskname
|
||||
// and ensure that exactly one is found or throw an error
|
||||
DiagOpTask task = ServiceSelector.of(taskname, ServiceLoader.load(DiagOpTask.class)).getOne();
|
||||
DiagTask task = ServiceSelector.of(optype, ServiceLoader.load(DiagTask.class)).getOne();
|
||||
|
||||
// Load the configuration model of the dynamically loaded task for type-safe configuration
|
||||
NBConfigModel cfgmodel = task.getConfigModel();
|
||||
|
||||
// Apply the raw configuration data to the configuration model, which
|
||||
// both validates the provided configuration fields and
|
||||
// yields a usable configuration that should apply to the loaded task without error or ambiguity
|
||||
NBConfiguration taskconfig = cfgmodel.apply(cfgmap);
|
||||
NBConfiguration taskconfig = cfgmodel.apply(taskcfg);
|
||||
|
||||
// Apply the validated configuration to the loaded task
|
||||
task.applyConfig(taskconfig);
|
||||
|
||||
// Store the task into the diag op's list of things to do when it runs
|
||||
tasks.add(task);
|
||||
}
|
||||
return l -> new DiagOp(tasks);
|
||||
this.opFunc = new OpFunc(tasks);
|
||||
return opFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyReconfig(NBConfiguration recfg) {
|
||||
opFunc.applyReconfig(recfg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getReconfigModel() {
|
||||
return opFunc.getReconfigModel();
|
||||
}
|
||||
|
||||
private final static class OpFunc implements LongFunction<DiagOp>, NBReconfigurable {
|
||||
private final List<DiagTask> tasks;
|
||||
public OpFunc(List<DiagTask> tasks) {
|
||||
this.tasks = tasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DiagOp apply(long value) {
|
||||
return new DiagOp(tasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyReconfig(NBConfiguration recfg) {
|
||||
NBReconfigurable.applyMatching(recfg,tasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getReconfigModel() {
|
||||
return NBReconfigurable.collectModels(DiagTask.class, tasks);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DiagOp apply(long value) {
|
||||
return opFunc.apply(value);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -18,11 +18,42 @@ package io.nosqlbench.adapter.diag;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.NBReconfigurable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class DiagOpMapper implements OpMapper<DiagOp>, NBReconfigurable {
|
||||
private final DriverSpaceCache<? extends DiagSpace> spaceCache;
|
||||
private final Map<String,DiagOpDispenser> dispensers = new LinkedHashMap<>();
|
||||
|
||||
public DiagOpMapper(DriverSpaceCache<? extends DiagSpace> spaceCache) {
|
||||
this.spaceCache = spaceCache;
|
||||
}
|
||||
|
||||
public class DiagOpMapper implements OpMapper<DiagOp> {
|
||||
@Override
|
||||
public OpDispenser<? extends DiagOp> apply(ParsedOp cmd) {
|
||||
return new DiagOpDispenser(cmd);
|
||||
DiagOpDispenser dispenser = new DiagOpDispenser(cmd);
|
||||
LongFunction<String> spaceName = cmd.getAsFunctionOr("space", "default");
|
||||
LongFunction<DiagSpace> spacef = l -> spaceCache.get(spaceName.apply(l));
|
||||
dispensers.put(cmd.getName(),dispenser);
|
||||
return dispenser;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void applyReconfig(NBConfiguration recfg) {
|
||||
NBReconfigurable.applyMatching(recfg, dispensers.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getReconfigModel() {
|
||||
return NBReconfigurable.collectModels(this.getClass(),new ArrayList<>(dispensers.values()));
|
||||
}
|
||||
}
|
||||
|
@ -22,29 +22,35 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.Param;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class DiagSpace implements ActivityDefObserver {
|
||||
private final NBConfiguration cfg;
|
||||
private RateLimiter diagRateLimiter;
|
||||
private final Logger logger = LogManager.getLogger(DiagSpace.class);
|
||||
|
||||
public DiagSpace(NBConfiguration cfg) {
|
||||
private final NBConfiguration cfg;
|
||||
private final String name;
|
||||
private RateLimiter diagRateLimiter;
|
||||
private long interval;
|
||||
|
||||
public DiagSpace(String name, NBConfiguration cfg) {
|
||||
this.cfg = cfg;
|
||||
this.name = name;
|
||||
logger.trace("diag space initialized as '" + name + "'");
|
||||
}
|
||||
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
|
||||
this.interval = cfg.get("interval",long.class);
|
||||
}
|
||||
|
||||
public static NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(DiagSpace.class)
|
||||
.add(Param.defaultTo("interval",1000))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
public boolean isLogCycle() {
|
||||
return cfg.getOrDefault("logcycle",false);
|
||||
}
|
||||
|
||||
public void maybeWaitForOp() {
|
||||
public void maybeWaitForOp(double diagrate) {
|
||||
if (diagRateLimiter != null) {
|
||||
long waittime = diagRateLimiter.maybeWaitForOp();
|
||||
}
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.diag.optasks;
|
||||
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigurable;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
/**
|
||||
* The base type for building Diagnostic Op tasks, which are composed into
|
||||
* a diagnostic operation in order. This allows for a very flexible compositional
|
||||
* structure which can be extended as needed. The diag driver is meant to
|
||||
* be the core driver for the purposes of validating the NoSQLBench engine
|
||||
* and associated libraries and extensions.
|
||||
*
|
||||
* The tasks are retained as shared across threads by default. This means that if
|
||||
* you want to have per-thread logic or to do things which are not thread-safe,
|
||||
* you need to use ThreadLocal or some other pattern to do so.
|
||||
*
|
||||
* Tasks must provide a no-args constructor (or no constructor, which does the same).
|
||||
* Tasks must specify their configuration requirements via the {@link NBConfigurable}
|
||||
* interface.
|
||||
*
|
||||
* Tasks may be evented for updates from the activity params at runtime by implementing
|
||||
* the {@link io.nosqlbench.nb.api.config.standard.NBReconfigurable} interface.
|
||||
*/
|
||||
public interface DiagTask extends
|
||||
BiFunction<Long,Map<String,Object>, Map<String,Object>>,
|
||||
NBConfigurable,
|
||||
NBNamedElement
|
||||
{
|
||||
Map<String, Object> apply(Long cycle, Map<String, Object> opstate);
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.diag.optasks;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Service(value = DiagTask.class, selector = "diagrate")
|
||||
public class DiagTask_diagrate implements DiagTask, NBReconfigurable {
|
||||
private String name;
|
||||
private RateLimiter rateLimiter;
|
||||
private RateSpec rateSpec;
|
||||
|
||||
private void updateRateLimiter(String spec) {
|
||||
this.rateSpec = new RateSpec(spec);
|
||||
rateLimiter = RateLimiters.createOrUpdate(
|
||||
this,
|
||||
"diag",
|
||||
rateLimiter,
|
||||
rateSpec
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(DiagTask_diagrate.class)
|
||||
.add(Param.required("diagrate", String.class))
|
||||
.add(Param.required("name", String.class))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getReconfigModel() {
|
||||
return ConfigModel.of(DiagTask_diagrate.class)
|
||||
.add(Param.optional("diagrate"))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
this.name = cfg.get("name", String.class);
|
||||
cfg.getOptional("diagrate").ifPresent(this::updateRateLimiter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyReconfig(NBConfiguration recfg) {
|
||||
recfg.getOptional("diagrate").ifPresent(this::updateRateLimiter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
|
||||
rateLimiter.maybeWaitForOp();
|
||||
return stringObjectMap;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.diag.optasks;
|
||||
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.Param;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Cause a blocking call to delay the initialization
|
||||
* of this owning operation for a number of milliseconds.
|
||||
*/
|
||||
@Service(value= DiagTask.class,selector = "erroroncycle")
|
||||
public class DiagTask_erroroncycle implements DiagTask {
|
||||
|
||||
private String name;
|
||||
private long error_on_cycle;
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
this.name = cfg.get("name",String.class);
|
||||
error_on_cycle = cfg.get("erroroncycle",long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(DiagTask_erroroncycle.class)
|
||||
.add(Param.required("name",String.class))
|
||||
.add(Param.defaultTo("erroroncycle",1L))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
|
||||
if (error_on_cycle==aLong) {
|
||||
throw new RuntimeException("Diag was requested to stop on cycle " + error_on_cycle);
|
||||
}
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.diag.optasks;
|
||||
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.Param;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Cause a blocking call to delay the initialization
|
||||
* of this owning operation for a number of milliseconds.
|
||||
*/
|
||||
@Service(value= DiagTask.class,selector = "initdelay")
|
||||
public class DiagTask_initdelay implements DiagTask {
|
||||
|
||||
private String name;
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
this.name = cfg.get("name",String.class);
|
||||
long initdelay = cfg.get("initdelay",long.class);
|
||||
try {
|
||||
Thread.sleep(initdelay);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(DiagTask_initdelay.class)
|
||||
.add(Param.required("name",String.class))
|
||||
.add(Param.optional("initdelay",Long.class))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
|
||||
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.diag.optasks;
|
||||
|
||||
import io.nosqlbench.adapter.diag.DiagSpace;
|
||||
import io.nosqlbench.adapter.diag.types.SpaceFuncUser;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
@Service(value= DiagTask.class,selector="log")
|
||||
public class DiagTask_log implements DiagTask, NBConfigurable, SpaceFuncUser {
|
||||
private final static Logger logger = LogManager.getLogger("DIAG");
|
||||
private Level level;
|
||||
private long modulo;
|
||||
private long interval;
|
||||
private LongFunction<DiagSpace> spaceF;
|
||||
private String name;
|
||||
|
||||
@Override
|
||||
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
|
||||
if ((aLong % modulo) == 0) {
|
||||
logger.log(level,"cycle=" + aLong+" state="+stringObjectMap.toString());
|
||||
}
|
||||
return stringObjectMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
String level = cfg.getOptional("level").orElse("INFO");
|
||||
this.name = cfg.get("name");
|
||||
this.level = Level.valueOf(level);
|
||||
this.modulo = cfg.get("modulo",long.class);
|
||||
this.interval = cfg.get("interval",long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(DiagTask_log.class)
|
||||
.add(Param.required("name",String.class))
|
||||
.add(Param.optional("level"))
|
||||
.add(Param.defaultTo("modulo", 1))
|
||||
.add(Param.defaultTo("interval",1000))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSpaceFunc(LongFunction<DiagSpace> spaceF) {
|
||||
this.spaceF = spaceF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
@ -17,34 +17,37 @@
|
||||
package io.nosqlbench.adapter.diag.optasks;
|
||||
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import io.nosqlbench.nb.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.config.standard.Param;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Service(value=DiagOpTask.class,selector="log")
|
||||
public class DiagTask_Log implements DiagOpTask, NBConfigurable {
|
||||
private final static Logger logger = LogManager.getLogger("DIAG");
|
||||
private Level level;
|
||||
@Service(value= DiagTask.class,selector = "noop")
|
||||
public class DiagTask_noop implements DiagTask {
|
||||
|
||||
@Override
|
||||
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
|
||||
logger.log(level,"cycle=" + aLong+" state="+stringObjectMap.toString());
|
||||
return stringObjectMap;
|
||||
}
|
||||
private String name;
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
String level = cfg.getOptional("level").orElse("INFO");
|
||||
this.level = Level.valueOf(level);
|
||||
this.name = cfg.get("name",String.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(DiagTask_Log.class)
|
||||
.add(Param.optional("level"))
|
||||
return ConfigModel.of(DiagTask_noop.class)
|
||||
.add(Param.required("name",String.class))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
@ -812,4 +812,7 @@ public class ParsedOp implements LongFunction<Map<String, ?>>, StaticFieldReader
|
||||
return lfa;
|
||||
}
|
||||
|
||||
public Map<String, Object> parseStaticCmdMap(String key, String mainField) {
|
||||
return tmap.parseStaticCmdMap(key, mainField);
|
||||
}
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ public class NBConfiguration {
|
||||
return data == null || data.isEmpty();
|
||||
}
|
||||
|
||||
public Map<String, Object> getMap() {
|
||||
public LinkedHashMap<String, Object> getMap() {
|
||||
return data;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import io.nosqlbench.engine.api.templating.binders.ListBinder;
|
||||
import io.nosqlbench.engine.api.templating.binders.OrderedMapBinder;
|
||||
import io.nosqlbench.nb.api.config.fieldreaders.DynamicFieldReader;
|
||||
import io.nosqlbench.nb.api.config.fieldreaders.StaticFieldReader;
|
||||
import io.nosqlbench.nb.api.config.params.ParamsParser;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigError;
|
||||
import io.nosqlbench.nb.api.config.standard.NBTypeConverter;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
@ -866,4 +867,8 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
|
||||
}
|
||||
|
||||
|
||||
public Map<String, Object> parseStaticCmdMap(String taskname, String mainField) {
|
||||
Object mapsrc = getStaticValue(taskname);
|
||||
return new LinkedHashMap<String,Object>(ParamsParser.parseToMap(mapsrc,mainField));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user