Merge pull request #779 from nosqlbench/shutdown_hooks

Shutdown hooks
This commit is contained in:
Jonathan Shook 2022-11-22 23:25:02 -06:00 committed by GitHub
commit fa528eebed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 133 additions and 60 deletions

View File

@ -23,7 +23,7 @@ on:
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
permissions:
actions: read
contents: read

View File

@ -17,14 +17,14 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.adapter.cqld4.opmappers.Cqld4CoreOpMapper;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -28,9 +28,9 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.adapter.cqld4.optionhelpers.OptionHelpers;
import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import io.nosqlbench.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -44,7 +44,7 @@ import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
public class Cqld4Space {
public class Cqld4Space implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(Cqld4Space.class);
private final String space;
@ -282,8 +282,8 @@ public class Cqld4Space {
public static NBConfigModel getConfigModel() {
return ConfigModel.of(Cqld4Space.class)
.add(Param.optional("localdc"))
.add(Param.optional(List.of("secureconnectbundle","scb")))
.add(Param.optional(List.of("hosts","host")))
.add(Param.optional(List.of("secureconnectbundle", "scb")))
.add(Param.optional(List.of("hosts", "host")))
.add(Param.optional("driverconfig", String.class))
.add(Param.optional("username", String.class, "user name (see also password and passfile)"))
.add(Param.optional("userfile", String.class, "file to load the username from"))
@ -299,4 +299,13 @@ public class Cqld4Space {
}
@Override
public void close() {
try {
this.getSession().close();
} catch (Exception e) {
logger.warn("auto-closeable cql session threw exception in cql space(" + this.space + "): " + e);
throw e;
}
}
}

View File

@ -51,7 +51,7 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
@Override
public synchronized OpMapper<DiagOp> getOpMapper() {
if (this.mapper == null) {
this.mapper = new DiagOpMapper(this, getSpaceCache());
this.mapper = new DiagOpMapper(this);
}
return this.mapper;
}

View File

@ -28,9 +28,11 @@ public class DiagOp implements CycleOp<Integer> {
private final static Logger logger = LogManager.getLogger(DiagOp.class);
private final List<DiagTask> mutators;
private final DiagSpace space;
public DiagOp(List<DiagTask> mutators) {
public DiagOp(DiagSpace space, List<DiagTask> mutators) {
this.mutators = mutators;
this.space = space;
}
@Override

View File

@ -17,14 +17,13 @@
package io.nosqlbench.adapter.diag;
import io.nosqlbench.adapter.diag.optasks.DiagTask;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.NBReconfigurable;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.annotations.ServiceSelector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -39,12 +38,12 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
private LongFunction<DiagSpace> spaceF;
private OpFunc opFuncs;
public DiagOpDispenser(DriverAdapter adapter, ParsedOp op) {
public DiagOpDispenser(DiagDriverAdapter adapter, LongFunction<DiagSpace> spaceF, ParsedOp op) {
super(adapter,op);
this.opFunc = resolveOpFunc(op);
this.opFunc = resolveOpFunc(spaceF, op);
}
private OpFunc resolveOpFunc(ParsedOp op) {
private OpFunc resolveOpFunc(LongFunction<DiagSpace> spaceF, ParsedOp op) {
List<DiagTask> tasks = new ArrayList<>();
Set<String> tasknames = op.getDefinedNames();
@ -82,7 +81,7 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
// Store the task into the diag op's list of things to do when it runs
tasks.add(task);
}
this.opFunc = new OpFunc(tasks);
this.opFunc = new OpFunc(spaceF,tasks);
return opFunc;
}
@ -98,13 +97,17 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
private final static class OpFunc implements LongFunction<DiagOp>, NBReconfigurable {
private final List<DiagTask> tasks;
public OpFunc(List<DiagTask> tasks) {
private final LongFunction<DiagSpace> spaceF;
public OpFunc(LongFunction<DiagSpace> spaceF, List<DiagTask> tasks) {
this.tasks = tasks;
this.spaceF = spaceF;
}
@Override
public DiagOp apply(long value) {
return new DiagOp(tasks);
DiagSpace space = spaceF.apply(value);
return new DiagOp(space, tasks);
}
@Override

View File

@ -16,14 +16,12 @@
package io.nosqlbench.adapter.diag;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.NBReconfigurable;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@ -31,20 +29,17 @@ import java.util.Map;
import java.util.function.LongFunction;
public class DiagOpMapper implements OpMapper<DiagOp>, NBReconfigurable {
private final DriverSpaceCache<? extends DiagSpace> spaceCache;
private final Map<String,DiagOpDispenser> dispensers = new LinkedHashMap<>();
private final DriverAdapter adapter;
private final DiagDriverAdapter adapter;
public DiagOpMapper(DriverAdapter adapter, DriverSpaceCache<? extends DiagSpace> spaceCache) {
this.spaceCache = spaceCache;
public DiagOpMapper(DiagDriverAdapter adapter) {
this.adapter = adapter;
}
@Override
public OpDispenser<? extends DiagOp> apply(ParsedOp op) {
DiagOpDispenser dispenser = new DiagOpDispenser(adapter,op);
LongFunction<String> spaceName = op.getAsFunctionOr("space", "default");
LongFunction<DiagSpace> spacef = l -> spaceCache.get(spaceName.apply(l));
LongFunction<DiagSpace> spaceF = adapter.getSpaceFunc(op);
DiagOpDispenser dispenser = new DiagOpDispenser(adapter,spaceF,op);
dispensers.put(op.getName(),dispenser);
return dispenser;
}

View File

@ -26,13 +26,14 @@ import io.nosqlbench.api.config.standard.Param;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DiagSpace implements ActivityDefObserver {
public class DiagSpace implements ActivityDefObserver, AutoCloseable {
private final Logger logger = LogManager.getLogger(DiagSpace.class);
private final NBConfiguration cfg;
private final String name;
private RateLimiter diagRateLimiter;
private long interval;
private boolean errorOnClose;
public DiagSpace(String name, NBConfiguration cfg) {
this.cfg = cfg;
@ -42,11 +43,13 @@ public class DiagSpace implements ActivityDefObserver {
public void applyConfig(NBConfiguration cfg) {
this.interval = cfg.get("interval",long.class);
this.errorOnClose = cfg.get("erroronclose",boolean.class);
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(DiagSpace.class)
.add(Param.defaultTo("interval",1000))
.add(Param.defaultTo("erroronclose", false))
.asReadOnly();
}
@ -61,4 +64,12 @@ public class DiagSpace implements ActivityDefObserver {
NBConfiguration cfg = getConfigModel().apply(activityDef.getParams().getStringStringMap());
this.applyConfig(cfg);
}
@Override
public void close() throws Exception {
logger.debug("closing diag space '" + this.name + "'");
if (errorOnClose) {
throw new RuntimeException("diag space was configured to throw this error when it was configured.");
}
}
}

View File

@ -20,6 +20,8 @@ import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.engine.api.activityimpl.uniform.fieldmappers.FieldDestructuringMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@ -29,7 +31,8 @@ import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter<R,S>, NBConfigurable, NBReconfigurable {
public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapter<R, S>, NBConfigurable, NBReconfigurable {
private final static Logger logger = LogManager.getLogger("ADAPTER");
private DriverSpaceCache<? extends S> spaceCache;
private NBConfiguration cfg;
@ -43,22 +46,22 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
*/
@Override
public final Function<Map<String, Object>, Map<String, Object>> getPreprocessor() {
List<Function<Map<String,Object>,Map<String,Object>>> mappers = new ArrayList<>();
List<Function<Map<String,Object>,Map<String,Object>>> stmtRemappers =
List<Function<Map<String, Object>, Map<String, Object>>> mappers = new ArrayList<>();
List<Function<Map<String, Object>, Map<String, Object>>> stmtRemappers =
getOpStmtRemappers().stream()
.map(m -> new FieldDestructuringMapper("stmt",m))
.map(m -> new FieldDestructuringMapper("stmt", m))
.collect(Collectors.toList());
mappers.addAll(stmtRemappers);
mappers.addAll(getOpFieldRemappers());
if (mappers.size()==0) {
if (mappers.size() == 0) {
return (i) -> i;
}
Function<Map<String,Object>,Map<String,Object>> remapper = null;
Function<Map<String, Object>, Map<String, Object>> remapper = null;
for (int i = 0; i < mappers.size(); i++) {
if (i==0) {
remapper=mappers.get(i);
if (i == 0) {
remapper = mappers.get(i);
} else {
remapper = remapper.andThen(mappers.get(i));
}
@ -102,7 +105,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
*
* @return A list of optionally applied remapping functions.
*/
public List<Function<String, Optional<Map<String,Object>>>> getOpStmtRemappers() {
public List<Function<String, Optional<Map<String, Object>>>> getOpStmtRemappers() {
return List.of();
}
@ -112,14 +115,14 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
* @return
*/
@Override
public List<Function<Map<String,Object>,Map<String,Object>>> getOpFieldRemappers() {
public List<Function<Map<String, Object>, Map<String, Object>>> getOpFieldRemappers() {
return List.of();
}
@Override
public synchronized final DriverSpaceCache<? extends S> getSpaceCache() {
if (spaceCache==null) {
spaceCache=new DriverSpaceCache<>(getSpaceInitializer(getConfiguration()));
if (spaceCache == null) {
spaceCache = new DriverSpaceCache<>(getSpaceInitializer(getConfiguration()));
}
return spaceCache;
}
@ -149,7 +152,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
public NBConfigModel getConfigModel() {
return ConfigModel.of(BaseDriverAdapter.class)
.add(Param.optional("alias"))
.add(Param.defaultTo("strict",true,"strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.defaultTo("strict", true, "strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form"))
.add(Param.optional("tags", String.class, "tags to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration"))
@ -162,7 +165,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver",String.class))
.add(Param.optional("driver", String.class))
.asReadOnly();
}

View File

@ -148,7 +148,15 @@ public interface DriverAdapter<OPTYPE extends Op, SPACETYPE> {
DriverSpaceCache<? extends SPACETYPE> getSpaceCache();
/**
* @return A function which can initialize a new S
* This method allows each driver adapter to create named state which is automatically
* cached and re-used by name. For each (driver,space) combination in an activity,
* a distinct space instance will be created. In general, adapter developers will
* use the space type associated with an adapter to wrap native driver instances
* one-to-one. As such, if the space implementation is a {@link AutoCloseable},
* it will be explicitly shutdown as part of the activity shutdown.
*
* @return A function which can initialize a new Space, which is a place to hold
* object state related to retained objects for the lifetime of a native driver.
*/
default Function<String, ? extends SPACETYPE> getSpaceInitializer(NBConfiguration cfg) {
return n -> null;

View File

@ -16,6 +16,8 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@ -64,4 +66,8 @@ public class DriverSpaceCache<S> {
return cache.computeIfAbsent(name, newSpaceFunction);
}
public Map<String,S> getElements() {
return Collections.unmodifiableMap(cache);
}
}

View File

@ -53,11 +53,13 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends Op>> opsequence;
private final int maxTries;
public StandardAction(A activity, int slot) {
this.activity = activity;
this.opsequence = activity.getOpSequence();
this.slot = slot;
this.maxTries = activity.getMaxTries();
bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram();
@ -84,7 +86,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
while (op != null) {
int tries = 0;
while (tries++ <= activity.getMaxTries()) {
while (tries++ <= maxTries) {
Throwable error = null;
long startedAt = System.nanoTime();

View File

@ -58,12 +58,11 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
}
else {
yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly();
} else {
yamlmodel = ConfigModel.of(StandardActivity.class).asReadOnly();
}
ServiceLoader<DriverAdapter> adapterLoader = ServiceLoader.load(DriverAdapter.class);
@ -77,7 +76,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
List<DriverAdapter> adapterlist = new ArrayList<>();
for (OpTemplate ot : opTemplates) {
ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of());
String driverName = incompleteOpDef.takeOptionalStaticValue("driver",String.class)
String driverName = incompleteOpDef.takeOptionalStaticValue("driver", String.class)
.or(() -> activityDef.getParams().getOptionalString("driver"))
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
@ -99,13 +98,13 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
configurable.applyConfig(combinedConfig);
}
adapters.put(driverName,adapter);
mappers.put(driverName,adapter.getOpMapper());
adapters.put(driverName, adapter);
mappers.put(driverName, adapter.getOpMapper());
}
DriverAdapter adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot,adapter.getConfiguration(),List.of(adapter.getPreprocessor()));
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()));
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
pops.add(pop);
}
@ -152,13 +151,13 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg,List.of(configurable));
NBReconfigurable.applyMatching(cfg, List.of(configurable));
}
}
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
@ -169,4 +168,26 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
return opTemplates;
}
/**
* This is done here since driver adapters are intended to keep all of their state within
* dedicated <em>state space</em> types. Any space which implements {@link io.nosqlbench.engine.api.activityapi.core.Shutdownable}
* will be closed when this activity shuts down.
*/
@Override
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter<?,?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
if (space instanceof AutoCloseable autocloseable) {
try {
autocloseable.close();
} catch (Exception e) {
throw new RuntimeException("Error while shutting down state space for " +
"adapter=" + adapterName + ", space=" + spaceName + ": " + e, e);
}
}
});
}
}
}

View File

@ -86,5 +86,18 @@ class ExitStatusIntegrationTests {
assertThat(result.exitStatus).isEqualTo(2);
}
// This will not work reliablyl until the activity shutdown bug is fixed.
// @Test
// public void testCloseErrorHandlerOnSpace() {
// ProcessInvoker invoker = new ProcessInvoker();
// invoker.setLogDir("logs/test");
// ProcessResult result = invoker.run("exitstatus_erroronclose", 30,
// java, "-jar", JARNAME, "--logs-dir", "logs/test/error_on_close", "run",
// "driver=diag", "threads=2", "rate=5", "op=noop", "cycles=10", "erroronclose=true", "-vvv"
// );
// String stdout = String.join("\n", result.getStdoutData());
// String stderr = String.join("\n", result.getStderrData());
// assertThat(result.exception).isNotNull();
// assertThat(result.exception.getMessage()).contains("diag space was configured to throw");
// }
}