merged my-NB5-I446 with main

This commit is contained in:
Mike Yaacoub 2022-12-19 15:57:58 -05:00
commit a06e1556f0
51 changed files with 870 additions and 392 deletions

View File

@ -27,13 +27,13 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: mvn package
- name: mvn-package
run: mvn package
- name: export docs
run: nb5/target/nb5 export-docs
- name: archive docs
- name: upload docs artifact
uses: actions/upload-artifact@v3
with:
name: exported-docs
@ -42,13 +42,16 @@ jobs:
- name: mvn verify
run: mvn verify
- name: Capture
if: success() || failure()
run: tar -cvf logfiles.tar [a-zA-Z]**/logs/*
- name: Archive Test Results
if: always()
if: success() || failure()
uses: actions/upload-artifact@v3
with:
name: test-results
path: |
[a-zA-Z]**/logs/*
path: logfiles.tar
docs:

View File

@ -23,7 +23,7 @@ on:
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
permissions:
actions: read
contents: read

View File

@ -44,7 +44,7 @@
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.12</version>
<version>3.0.13</version>
</dependency>
<dependency>
@ -67,13 +67,13 @@
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.14.1</version>
<version>4.15.0</version>
</dependency>
<dependency>
<groupId>org.snakeyaml</groupId>
<artifactId>snakeyaml-engine</artifactId>
<version>2.4</version>
<version>2.5</version>
</dependency>
<dependency>

View File

@ -17,14 +17,14 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.adapter.cqld4.opmappers.Cqld4CoreOpMapper;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -28,9 +28,9 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.adapter.cqld4.optionhelpers.OptionHelpers;
import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import io.nosqlbench.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -44,7 +44,7 @@ import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
public class Cqld4Space {
public class Cqld4Space implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(Cqld4Space.class);
private final String space;
@ -139,7 +139,7 @@ public class Cqld4Space {
try {
username = Files.readAllLines(path).get(0);
} catch (IOException e) {
String error = "Error while reading username from file:" + usernameOpt.get();
String error = "Error while reading username from file:" + path;
logger.error(error, e);
throw new RuntimeException(e);
}
@ -155,7 +155,7 @@ public class Cqld4Space {
try {
password = Files.readAllLines(path).get(0);
} catch (IOException e) {
String error = "Error while reading password from file:" + passfileOpt.get();
String error = "Error while reading password from file:" + path;
logger.error(error, e);
throw new RuntimeException(e);
}
@ -282,8 +282,8 @@ public class Cqld4Space {
public static NBConfigModel getConfigModel() {
return ConfigModel.of(Cqld4Space.class)
.add(Param.optional("localdc"))
.add(Param.optional(List.of("secureconnectbundle","scb")))
.add(Param.optional(List.of("hosts","host")))
.add(Param.optional(List.of("secureconnectbundle", "scb")))
.add(Param.optional(List.of("hosts", "host")))
.add(Param.optional("driverconfig", String.class))
.add(Param.optional("username", String.class, "user name (see also password and passfile)"))
.add(Param.optional("userfile", String.class, "file to load the username from"))
@ -299,4 +299,13 @@ public class Cqld4Space {
}
@Override
public void close() {
try {
this.getSession().close();
} catch (Exception e) {
logger.warn("auto-closeable cql session threw exception in cql space(" + this.space + "): " + e);
throw e;
}
}
}

View File

@ -51,7 +51,7 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
@Override
public synchronized OpMapper<DiagOp> getOpMapper() {
if (this.mapper == null) {
this.mapper = new DiagOpMapper(this, getSpaceCache());
this.mapper = new DiagOpMapper(this);
}
return this.mapper;
}

View File

@ -28,9 +28,11 @@ public class DiagOp implements CycleOp<Integer> {
private final static Logger logger = LogManager.getLogger(DiagOp.class);
private final List<DiagTask> mutators;
private final DiagSpace space;
public DiagOp(List<DiagTask> mutators) {
public DiagOp(DiagSpace space, List<DiagTask> mutators) {
this.mutators = mutators;
this.space = space;
}
@Override

View File

@ -17,14 +17,13 @@
package io.nosqlbench.adapter.diag;
import io.nosqlbench.adapter.diag.optasks.DiagTask;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.NBReconfigurable;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.annotations.ServiceSelector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -39,12 +38,12 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
private LongFunction<DiagSpace> spaceF;
private OpFunc opFuncs;
public DiagOpDispenser(DriverAdapter adapter, ParsedOp op) {
public DiagOpDispenser(DiagDriverAdapter adapter, LongFunction<DiagSpace> spaceF, ParsedOp op) {
super(adapter,op);
this.opFunc = resolveOpFunc(op);
this.opFunc = resolveOpFunc(spaceF, op);
}
private OpFunc resolveOpFunc(ParsedOp op) {
private OpFunc resolveOpFunc(LongFunction<DiagSpace> spaceF, ParsedOp op) {
List<DiagTask> tasks = new ArrayList<>();
Set<String> tasknames = op.getDefinedNames();
@ -82,7 +81,7 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
// Store the task into the diag op's list of things to do when it runs
tasks.add(task);
}
this.opFunc = new OpFunc(tasks);
this.opFunc = new OpFunc(spaceF,tasks);
return opFunc;
}
@ -98,13 +97,17 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
private final static class OpFunc implements LongFunction<DiagOp>, NBReconfigurable {
private final List<DiagTask> tasks;
public OpFunc(List<DiagTask> tasks) {
private final LongFunction<DiagSpace> spaceF;
public OpFunc(LongFunction<DiagSpace> spaceF, List<DiagTask> tasks) {
this.tasks = tasks;
this.spaceF = spaceF;
}
@Override
public DiagOp apply(long value) {
return new DiagOp(tasks);
DiagSpace space = spaceF.apply(value);
return new DiagOp(space, tasks);
}
@Override

View File

@ -16,14 +16,12 @@
package io.nosqlbench.adapter.diag;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.NBReconfigurable;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@ -31,20 +29,17 @@ import java.util.Map;
import java.util.function.LongFunction;
public class DiagOpMapper implements OpMapper<DiagOp>, NBReconfigurable {
private final DriverSpaceCache<? extends DiagSpace> spaceCache;
private final Map<String,DiagOpDispenser> dispensers = new LinkedHashMap<>();
private final DriverAdapter adapter;
private final DiagDriverAdapter adapter;
public DiagOpMapper(DriverAdapter adapter, DriverSpaceCache<? extends DiagSpace> spaceCache) {
this.spaceCache = spaceCache;
public DiagOpMapper(DiagDriverAdapter adapter) {
this.adapter = adapter;
}
@Override
public OpDispenser<? extends DiagOp> apply(ParsedOp op) {
DiagOpDispenser dispenser = new DiagOpDispenser(adapter,op);
LongFunction<String> spaceName = op.getAsFunctionOr("space", "default");
LongFunction<DiagSpace> spacef = l -> spaceCache.get(spaceName.apply(l));
LongFunction<DiagSpace> spaceF = adapter.getSpaceFunc(op);
DiagOpDispenser dispenser = new DiagOpDispenser(adapter,spaceF,op);
dispensers.put(op.getName(),dispenser);
return dispenser;
}

View File

@ -26,13 +26,14 @@ import io.nosqlbench.api.config.standard.Param;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DiagSpace implements ActivityDefObserver {
public class DiagSpace implements ActivityDefObserver, AutoCloseable {
private final Logger logger = LogManager.getLogger(DiagSpace.class);
private final NBConfiguration cfg;
private final String name;
private RateLimiter diagRateLimiter;
private long interval;
private boolean errorOnClose;
public DiagSpace(String name, NBConfiguration cfg) {
this.cfg = cfg;
@ -42,11 +43,13 @@ public class DiagSpace implements ActivityDefObserver {
public void applyConfig(NBConfiguration cfg) {
this.interval = cfg.get("interval",long.class);
this.errorOnClose = cfg.get("erroronclose",boolean.class);
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(DiagSpace.class)
.add(Param.defaultTo("interval",1000))
.add(Param.defaultTo("erroronclose", false))
.asReadOnly();
}
@ -61,4 +64,12 @@ public class DiagSpace implements ActivityDefObserver {
NBConfiguration cfg = getConfigModel().apply(activityDef.getParams().getStringStringMap());
this.applyConfig(cfg);
}
@Override
public void close() throws Exception {
logger.debug("closing diag space '" + this.name + "'");
if (errorOnClose) {
throw new RuntimeException("diag space was configured to throw this error when it was configured.");
}
}
}

View File

@ -28,7 +28,7 @@ import java.util.Map;
* Cause a blocking call to delay the initialization
* of this owning operation for a number of milliseconds.
*/
@Service(value= DiagTask.class,selector = "erroroncycle")
@Service(value = DiagTask.class, selector = "erroroncycle")
public class DiagTask_erroroncycle implements DiagTask {
private String name;
@ -36,21 +36,21 @@ public class DiagTask_erroroncycle implements DiagTask {
@Override
public void applyConfig(NBConfiguration cfg) {
this.name = cfg.get("name",String.class);
error_on_cycle = cfg.get("erroroncycle",long.class);
this.name = cfg.get("name", String.class);
error_on_cycle = cfg.get("erroroncycle", long.class);
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(DiagTask_erroroncycle.class)
.add(Param.required("name",String.class))
.add(Param.defaultTo("erroroncycle",1L))
.asReadOnly();
.add(Param.required("name", String.class))
.add(Param.defaultTo("erroroncycle", 1L))
.asReadOnly();
}
@Override
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
if (error_on_cycle==aLong) {
if (error_on_cycle == aLong) {
throw new RuntimeException("Diag was requested to stop on cycle " + error_on_cycle);
}
return Map.of();

View File

@ -45,7 +45,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.12.325</version>
<version>1.12.348</version>
</dependency>
</dependencies>

View File

@ -103,7 +103,8 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
protected LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue("seqerr_simu", String.class)
setStringLongFunction = (l) ->
parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
@ -118,7 +119,9 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
return set;
}).orElse(Collections.emptySet());
logger.info("seqerr_simu: {}", setStringLongFunction.apply(0));
logger.info(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
setStringLongFunction.apply(0));
return setStringLongFunction;
}
}

View File

@ -49,6 +49,7 @@ public class PulsarAdapterUtil {
TRANSACT_BATCH_NUM("transact_batch_num"),
ADMIN_DELOP("admin_delop"),
SEQ_TRACKING("seq_tracking"),
SEQERR_SIMU("seqerr_simu"),
RTT_TRACKING_FIELD("payload_traking_field"),
MSG_DEDUP_BROKER("msg_dedup_broker"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source");
@ -63,6 +64,43 @@ public class PulsarAdapterUtil {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
///////
// Message processing sequence error simulation types
public enum MSG_SEQ_ERROR_SIMU_TYPE {
OutOfOrder("out_of_order"),
MsgLoss("msg_loss"),
MsgDup("msg_dup");
public final String label;
MSG_SEQ_ERROR_SIMU_TYPE(String label) {
this.label = label;
}
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
static {
for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) {
MAPPING.put(simuType.label, simuType);
MAPPING.put(simuType.label.toLowerCase(), simuType);
MAPPING.put(simuType.label.toUpperCase(), simuType);
MAPPING.put(simuType.name(), simuType);
MAPPING.put(simuType.name().toLowerCase(), simuType);
MAPPING.put(simuType.name().toUpperCase(), simuType);
}
}
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
}
}
public static boolean isValidSeqErrSimuType(String item) {
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSeqErrSimuTypeList() {
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid Pulsar API type
public enum PULSAR_API_TYPE {
@ -382,43 +420,6 @@ public class PulsarAdapterUtil {
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Message processing sequence error simulation types
public enum MSG_SEQ_ERROR_SIMU_TYPE {
OutOfOrder("out_of_order"),
MsgLoss("msg_loss"),
MsgDup("msg_dup");
public final String label;
MSG_SEQ_ERROR_SIMU_TYPE(String label) {
this.label = label;
}
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
static {
for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) {
MAPPING.put(simuType.label, simuType);
MAPPING.put(simuType.label.toLowerCase(), simuType);
MAPPING.put(simuType.label.toUpperCase(), simuType);
MAPPING.put(simuType.name(), simuType);
MAPPING.put(simuType.name().toLowerCase(), simuType);
MAPPING.put(simuType.name().toUpperCase(), simuType);
}
}
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
}
}
public static boolean isValidSeqErrSimuType(String item) {
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSeqErrSimuTypeList() {
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Primitive Schema type
public static boolean isPrimitiveSchemaTypeStr(String typeStr) {

View File

@ -26,8 +26,6 @@ client.authParams=
### Producer related configurations (global) - producer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.producerName=
producer.topicName=
producer.sendTimeoutMs=
producer.blockIfQueueFull=true

View File

@ -0,0 +1,253 @@
- [1. Overview](#1-overview)
- [1.1. Issues Tracker](#11-issues-tracker)
- [2. Execute the NB Pulsar Driver Workload](#2-execute-the-nb-pulsar-driver-workload)
- [2.1. NB Pulsar Driver Yaml File High Level Structure](#21-nb-pulsar-driver-yaml-file-high-level-structure)
- [2.2. NB Pulsar Driver Configuration Parameters](#22-nb-pulsar-driver-configuration-parameters)
- [2.2.1. Global Level Parameters](#221-global-level-parameters)
- [2.2.2. Document Level Parameters](#222-document-level-parameters)
- [3. NB Pulsar Driver OpTemplates](#3-nb-pulsar-driver-optemplates)
- [4. Message Generation and Schema Support](#4-message-generation-and-schema-support)
- [4.1. Message Generation](#41-message-generation)
- [4.2. Schema Support](#42-schema-support)
# 1. Overview
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
* Admin API - create/delete tenants
* Admin API - create/delete namespaces
* Admin API - create/delete topics
* Topics can be partitioned or non-partitioned
* Producer - publish messages with schema support
* Default schema type is byte[]
* Avro schema and KeyValue schema are also supported
* Consumer - consume messages with schema support and the following support
* Different subscription types
* Multi-topic subscription (including Topic patterns)
* Subscription initial position
* Dead letter topic policy
* Negative acknowledgement and acknowledgement timeout redelivery backoff policy
## 1.1. Issues Tracker
If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
# 2. Execute the NB Pulsar Driver Workload
In order to run a NB Pulsar driver workload, it follows similar command as other NB driver types. But it does have its unique execution parameters. The general command has the following format:
```shell
<nb_cmd> run driver=pulsar threads=<thread_num> cycles=<cycle_count> web_url=<pulsar_web_svc_url> service_url=<pulsar_svc_url> config=<pulsar_client_config_property_file> yaml=<nb_scenario_yaml_file> [<other_common_NB_execution_parameters>]
```
In the above command, make sure the driver type is **pulsar** and provide the following Pulsar driver specific parameters:
* ***web_url***: Pulsar web service url and default to "http://localhost:8080"
* ***service_url***: Pulsar native protocol service url and default to "pulsar://localhost:6650"
* ***config***: Pulsar schema/client/producer/consumer related configuration (as a property file)
## 2.1. NB Pulsar Driver Yaml File High Level Structure
Just like other NB driver types, the actual NB Pulsar workload is defined in a YAML file with the following high level structure:
```yaml
description: |
...
bindings:
...
params:
...
blocks:
<block_1>:
ops:
op1:
<OpTypeIdentifier>: "<static_or_dynamic_value>"
<op_param_1>: "<some_value>"
<op_param_2>: "<some_value>"
...
<block_2>:
...
```
* ***description***: This is an (optional) section where to provide general description of the Pulsar NB workload defined in this file.
* ***bindings***: This section defines all NB bindings that are required in all OpTemplate blocks
* ***params***: This section defines **Document level** configuration parameters that apply to all OpTemplate blocks.
* ***blocks***: This section defines the OpTemplate blocks that are needed to execute Pulsar specific workloads. Each OpTemplate block may contain multiple OpTemplates.
## 2.2. NB Pulsar Driver Configuration Parameters
The NB Pulsar driver configuration parameters can be set at 3 different levels:
* Global level
* Document level
* The parameters at this level are those within a NB yaml file that impact all OpTemplates
* Op level (or Cycle level)
* The parameters at this level are those within a NB yaml file that are associated with each individual OpTemplate
Please **NOTE** that when a parameter is specified at multiple levels, the one at the lowest level takes precedence.
### 2.2.1. Global Level Parameters
The parameters at this level are those listed in the command line config properties file.
The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) complete its workloads such as creating/deleting tenants/namespaces/topics, generating messages, creating producers to send messages, and creating consumers to receive messages. The Pulsar client API has different configuration parameters to control the execution behavior. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration parameters for how a Pulsar producer can be created.
All these Pulsar "native" parameters are supported by the NB Pulsar driver, via the global configuration properties file (e.g. **config.properties**). An example of the structure of this file looks like below:
```properties
### Schema related configurations - MUST start with prefix "schema."
#schema.key.type=avro
#schema.key.definition=</path/to/avro-key-example.avsc>
schema.type=avro
schema.definition=</path/to/avro-value-example.avsc>
### Pulsar client related configurations - MUST start with prefix "client."
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
client.connectionTimeoutMs=5000
client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
client.authParams=
# ...
### Producer related configurations (global) - MUST start with prefix "producer."
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.sendTimeoutMs=
producer.blockIfQueueFull=true
# ...
### Consumer related configurations (global) - MUST start with prefix "consumer."
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
consumer.subscriptionInitialPosition=Earliest
consumer.deadLetterPolicy={"maxRedeliverCount":"5","retryLetterTopic":"public/default/retry","deadLetterTopic":"public/default/dlq","initialSubscriptionName":"dlq-sub"}
consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"10","maxDelayMs":"20","multiplier":"1.2"}
# ...
```
There are multiple sections in this file that correspond to different
categories of the configuration parameters:
* **`Pulsar Schema` related settings**:
* All settings under this section starts with **schema.** prefix.
* At the moment, there are 3 schema types supported
* Default raw ***byte[]***
* Avro schema for the message payload
* KeyValue based Avro schema for both message key and message payload
* **`Pulsar Client` related settings**:
* All settings under this section starts with **client.** prefix.
* This section defines all configuration parameters that are related with defining a PulsarClient object.
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
* **`Pulsar Producer` related settings**:
* All settings under this section starts with **producer** prefix.
* This section defines all configuration parameters that are related with defining a Pulsar Producer object.
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
* **`Pulsar Consumer` related settings**:
* All settings under this section starts with **consumer** prefix.
* This section defines all configuration parameters that are related with defining a Pulsar Consumer object.
* See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
### 2.2.2. Document Level Parameters
For the Pulsar NB driver, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
* ***async_api*** (boolean):
* When true, use async Pulsar client API.
* ***use_transaction*** (boolean):
* When true, use Pulsar transaction.
* ***admin_delop*** (boolean):
* When true, delete Tenants/Namespaces/Topics. Otherwise, create them.
* Only applicable to administration related operations
* ***seq_tracking*** (boolean):
* When true, a sequence number is created as part of each message's properties
* This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully.
* ***seqerr_simu***:
* A list of error simulation types separated by comma (,)
* Valid error simulation types
* `out_of_order`: simulate message out of sequence
* `msg_loss`: simulate message loss
* `msg_dup`: simulate message duplication
* ***e2e_starting_time_source***:
* Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch.
* The possible values for `e2e_starting_time_source`:
* `message_publish_time` : uses the message publishing timestamp as the starting time
* `message_event_time` : uses the message event timestamp as the starting time
* `message_property_e2e_starting_time` : uses a message property `e2e_starting_time` as the starting time.
# 3. NB Pulsar Driver OpTemplates
For the NB Pulsar driver, each OpTemplate has the following format:
```yaml
blocks:
<some_block_name>:
ops:
<some_op_name>:
<OpTypeIdentifier>: <tenant|namespace|topic_name>
<op_param_1>: "<some_value>"
<op_param_2>: "<some_value>"
...
```
The `OpTypeIdentifier` determines which NB Pulsar workload type (`OpType`) to run, and it has the following value:
```java
public enum PulsarOpType {
AdminTenant,
AdminNamespace,
AdminTopic,
MessageProduce,
MessageConsume;
}
```
Its value is mandatory and depending on the actual identifier, its value can be one of the following:
* ***Tenant name***: for `AdminTenant` type
* ***Namespace name***: for `AdminNamespace` type and in format "<tenant>/<namespace>"
* ***Topic name***: for the rest of the types and in format [(persistent|non-persistent)://]<tenant>/<namespace>/<topic>
is mandatory for each NB Pulsar operation type
Each Pulsar `OpType` may have optional Op specific parameters. Please refer to [here](yaml_examples) for the example NB Pulsar YAML files for each OpType
# 4. Message Generation and Schema Support
## 4.1. Message Generation
A Pulsar message has three main components: message key, message properties, and message payload. Among them, message payload is mandatory when creating a message.
When running the "message producing" workload, the NB Pulsar driver is able to generate a message with its full content via the following OpTemplate level parameters:
* `msg_key`: defines message key value
* `msg_property`: defines message property values
* `msg_value`: defines message payload value
The actual values of them can be static or dynamic (which are determined by NB data binding rules)
For `msg_key`, its value can be either
* a plain text string, or
* a JSON string that follows the specified "key" Avro schema (when KeyValue schema is used)
For `msg_property`, its value needs to be a JSON string that contains a list of key-value pairs. An example is as below. Please **NOTE** that if the provided value is not a valid JSON string, the NB Pulsar driver will ignore it and treat the message as having no properties.
```
msg_property: |
{
"prop1": "{myprop1}",
"prop2": "{myprop2}"
}
```
For `msg_value`, its value can be either
* a plain simple text, or
* a JSON string that follows the specified "value" Avro schema (when Avro schema or KeyValue schema is used)
## 4.2. Schema Support
The NB Pulsar driver supports the following Pulsar schema types:
* Primitive schema types
* Avro schema type (only for message payload - `msg_value`)
* KeyValue schema type (with both key and value follows an Avro schema)
The following 2 global configuration parameters define the required schema type
* `schema.key.type`: defines message key type
* `schema.type`: defines message value type
For them, if the parameter value is not specified, it means using the default `byte[]/BYTES` type as the schema type. Otherwise, if it is specified as "avro", it means using Avro as the schema type.
The following 2 global configuration parameters define the schema specification (**ONLY** needed when Avro is the schema type)
* `schema.key.definition`: a file path that defines the message key Avro schema specification
* `schema.definition`: a file path the message value Avro schema specification
The NB Pulsar driver will throw an error if the schema type is Avro but no schema specification definition file is not provided or is not valid.

View File

@ -9,8 +9,6 @@ params:
blocks:
admin-namespace-block:
tags:
phase: admin-namespace
ops:
op1:
AdminNamespace: "{tenant}/{namespace}"

View File

@ -8,8 +8,6 @@ params:
blocks:
admin-tenant-block:
tags:
phase: admin-tenant
ops:
op1:
AdminTopic: "{tenant}"

View File

@ -10,8 +10,6 @@ params:
blocks:
admin-topic-block:
tags:
phase: admin-topic
ops:
op1:
AdminTopic: "{tenant}/{namespace}/{topic}"

View File

@ -0,0 +1,11 @@
params:
async_api: "true"
blocks:
msg-consume-block:
ops:
op1:
MessageConsume: "tnt0/ns0/tp0"
consumerName: ""
subscriptionName: "mynbsub"
subscriptionType: "shared"

View File

@ -1,28 +1,25 @@
bindings:
# message key and value
mykey: NumberNameToString()
location: Cities();
well_id: ToUUID();ToString();
sensor_id: ToUUID();ToString();
reading_time: ToDateTime();
reading_value: ToFloat(100);
# document level parameters that apply to all Pulsar client types:
params:
async_api: "true"
blocks:
msg-produce-block:
tags:
phase: msg-send
ops:
op1:
MessageProduce: "tnt0/ns0/tp1"
producerName: ""
msg_key: |
{
"Location": "{location}",
"WellID": "{well_id}"
}
msg_properties: ""
msg_value: |
{
"SensorID": "{sensor_id}",
@ -30,12 +27,3 @@ blocks:
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
msg-consume-block:
tags:
phase: msg-recv
ops:
op1:
MessageConsume: "tnt0/ns0/tp0"
subscription_name: "mynbsub"
# subscription_type: "shared"

View File

@ -11,8 +11,6 @@ params:
blocks:
msg-produce-block:
tags:
phase: msg-send
ops:
op1:
MessageProduce: "tnt0/ns0/tp0"
@ -23,12 +21,3 @@ blocks:
"prop2": "{text_prop_val}"
}
msg_value: "{myvalue}"
msg-consume-block:
tags:
phase: msg-recv
ops:
op1:
MessageConsume: "tnt0/ns0/tp0"
subscriptionName: "mynbsub"
subscriptionType: "Shared"

View File

@ -20,6 +20,8 @@ import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.engine.api.activityimpl.uniform.fieldmappers.FieldDestructuringMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@ -29,7 +31,8 @@ import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter<R,S>, NBConfigurable, NBReconfigurable {
public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapter<R, S>, NBConfigurable, NBReconfigurable {
private final static Logger logger = LogManager.getLogger("ADAPTER");
private DriverSpaceCache<? extends S> spaceCache;
private NBConfiguration cfg;
@ -43,22 +46,22 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
*/
@Override
public final Function<Map<String, Object>, Map<String, Object>> getPreprocessor() {
List<Function<Map<String,Object>,Map<String,Object>>> mappers = new ArrayList<>();
List<Function<Map<String,Object>,Map<String,Object>>> stmtRemappers =
List<Function<Map<String, Object>, Map<String, Object>>> mappers = new ArrayList<>();
List<Function<Map<String, Object>, Map<String, Object>>> stmtRemappers =
getOpStmtRemappers().stream()
.map(m -> new FieldDestructuringMapper("stmt",m))
.map(m -> new FieldDestructuringMapper("stmt", m))
.collect(Collectors.toList());
mappers.addAll(stmtRemappers);
mappers.addAll(getOpFieldRemappers());
if (mappers.size()==0) {
if (mappers.size() == 0) {
return (i) -> i;
}
Function<Map<String,Object>,Map<String,Object>> remapper = null;
Function<Map<String, Object>, Map<String, Object>> remapper = null;
for (int i = 0; i < mappers.size(); i++) {
if (i==0) {
remapper=mappers.get(i);
if (i == 0) {
remapper = mappers.get(i);
} else {
remapper = remapper.andThen(mappers.get(i));
}
@ -102,7 +105,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
*
* @return A list of optionally applied remapping functions.
*/
public List<Function<String, Optional<Map<String,Object>>>> getOpStmtRemappers() {
public List<Function<String, Optional<Map<String, Object>>>> getOpStmtRemappers() {
return List.of();
}
@ -112,14 +115,14 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
* @return
*/
@Override
public List<Function<Map<String,Object>,Map<String,Object>>> getOpFieldRemappers() {
public List<Function<Map<String, Object>, Map<String, Object>>> getOpFieldRemappers() {
return List.of();
}
@Override
public synchronized final DriverSpaceCache<? extends S> getSpaceCache() {
if (spaceCache==null) {
spaceCache=new DriverSpaceCache<>(getSpaceInitializer(getConfiguration()));
if (spaceCache == null) {
spaceCache = new DriverSpaceCache<>(getSpaceInitializer(getConfiguration()));
}
return spaceCache;
}
@ -149,7 +152,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
public NBConfigModel getConfigModel() {
return ConfigModel.of(BaseDriverAdapter.class)
.add(Param.optional("alias"))
.add(Param.defaultTo("strict",true,"strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.defaultTo("strict", true, "strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form"))
.add(Param.optional("tags", String.class, "tags to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration"))
@ -162,7 +165,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver",String.class))
.add(Param.optional("driver", String.class))
.asReadOnly();
}

View File

@ -152,7 +152,15 @@ public interface DriverAdapter<OPTYPE extends Op, SPACETYPE> {
DriverSpaceCache<? extends SPACETYPE> getSpaceCache();
/**
* @return A function which can initialize a new S
* This method allows each driver adapter to create named state which is automatically
* cached and re-used by name. For each (driver,space) combination in an activity,
* a distinct space instance will be created. In general, adapter developers will
* use the space type associated with an adapter to wrap native driver instances
* one-to-one. As such, if the space implementation is a {@link AutoCloseable},
* it will be explicitly shutdown as part of the activity shutdown.
*
* @return A function which can initialize a new Space, which is a place to hold
* object state related to retained objects for the lifetime of a native driver.
*/
default Function<String, ? extends SPACETYPE> getSpaceInitializer(NBConfiguration cfg) {
return n -> null;

View File

@ -16,6 +16,8 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@ -64,4 +66,8 @@ public class DriverSpaceCache<S> {
return cache.computeIfAbsent(name, newSpaceFunction);
}
public Map<String,S> getElements() {
return Collections.unmodifiableMap(cache);
}
}

View File

@ -22,7 +22,7 @@
<name>docsys</name>
<url>http://nosqlbench.io/</url>
<properties>
<jersey.version>3.0.8</jersey.version>
<jersey.version>3.1.0</jersey.version>
</properties>
<parent>
@ -105,7 +105,7 @@
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.13.4</version>
<version>2.14.0</version>
</dependency>
<dependency>

View File

@ -42,7 +42,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.0</version>
<version>42.5.1</version>
</dependency>
</dependencies>

93
driver-pulsar/pom.xml Normal file
View File

@ -0,0 +1,93 @@
<!--
~ Copyright (c) 2022 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.22-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-pulsar</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A Pulsar driver for nosqlbench. This provides the ability to inject synthetic data
into a pulsar system.
</description>
<properties>
<pulsar.version>2.10.2</pulsar.version>
</properties>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.22-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
</project>

View File

@ -53,11 +53,13 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends Op>> opsequence;
private final int maxTries;
public StandardAction(A activity, int slot) {
this.activity = activity;
this.opsequence = activity.getOpSequence();
this.slot = slot;
this.maxTries = activity.getMaxTries();
bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram();
@ -84,7 +86,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
while (op != null) {
int tries = 0;
while (tries++ <= activity.getMaxTries()) {
while (tries++ <= maxTries) {
Throwable error = null;
long startedAt = System.nanoTime();

View File

@ -58,12 +58,11 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
}
else {
yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly();
} else {
yamlmodel = ConfigModel.of(StandardActivity.class).asReadOnly();
}
ServiceLoader<DriverAdapter> adapterLoader = ServiceLoader.load(DriverAdapter.class);
@ -77,7 +76,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
List<DriverAdapter> adapterlist = new ArrayList<>();
for (OpTemplate ot : opTemplates) {
ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of());
String driverName = incompleteOpDef.takeOptionalStaticValue("driver",String.class)
String driverName = incompleteOpDef.takeOptionalStaticValue("driver", String.class)
.or(() -> activityDef.getParams().getOptionalString("driver"))
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
@ -99,13 +98,13 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
configurable.applyConfig(combinedConfig);
}
adapters.put(driverName,adapter);
mappers.put(driverName,adapter.getOpMapper());
adapters.put(driverName, adapter);
mappers.put(driverName, adapter.getOpMapper());
}
DriverAdapter adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot,adapter.getConfiguration(),List.of(adapter.getPreprocessor()));
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()));
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
pops.add(pop);
}
@ -152,13 +151,13 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg,List.of(configurable));
NBReconfigurable.applyMatching(cfg, List.of(configurable));
}
}
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
@ -169,4 +168,26 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
return opTemplates;
}
/**
* This is done here since driver adapters are intended to keep all of their state within
* dedicated <em>state space</em> types. Any space which implements {@link io.nosqlbench.engine.api.activityapi.core.Shutdownable}
* will be closed when this activity shuts down.
*/
@Override
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter<?,?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
if (space instanceof AutoCloseable autocloseable) {
try {
autocloseable.close();
} catch (Exception e) {
throw new RuntimeException("Error while shutting down state space for " +
"adapter=" + adapterName + ", space=" + spaceName + ": " + e, e);
}
}
});
}
}
}

View File

@ -63,10 +63,10 @@ import java.util.stream.Collectors;
public class NBCLI implements Function<String[], Integer> {
private static Logger logger;
private static LoggerConfig loggerConfig;
private static int EXIT_OK = 0;
private static int EXIT_WARNING = 1;
private static int EXIT_ERROR = 2;
private static final LoggerConfig loggerConfig;
private static final int EXIT_OK = 0;
private static final int EXIT_WARNING = 1;
private static final int EXIT_ERROR = 2;
static {
loggerConfig = new LoggerConfig();
@ -83,6 +83,7 @@ public class NBCLI implements Function<String[], Integer> {
* Only call System.exit with the body of main. This is so that other scenario
* invocations are handled functionally by {@link #apply(String[])}, which allows
* for scenario encapsulation and concurrent testing.
*
* @param args Command Line Args
*/
public static void main(String[] args) {
@ -91,15 +92,17 @@ public class NBCLI implements Function<String[], Integer> {
int statusCode = cli.apply(args);
System.exit(statusCode);
} catch (Exception e) {
System.out.println("Not expected issue in main: " + e.getMessage());
}
}
/**
* return null;
* }
* return null;
* }
*
* public static void main(String[] args) {
* @param strings
* public static void main(String[] args) {
*
* @param args
* @return
*/
@Override
@ -114,10 +117,11 @@ public class NBCLI implements Function<String[], Integer> {
if (arg.toLowerCase(Locale.ROOT).startsWith("-v") || (arg.toLowerCase(Locale.ROOT).equals("--show-stacktraces"))) {
showStackTraces = true;
break;
}
}
String error = ScenarioErrorHandler.handle(e, showStackTraces);
String error = NBCLIErrorHandler.handle(e, showStackTraces);
// Commented for now, as the above handler should do everything needed.
if (error != null) {
System.err.println("Scenario stopped due to error. See logs for details.");
@ -150,7 +154,7 @@ public class NBCLI implements Function<String[], Integer> {
.setConsolePattern(globalOptions.getConsoleLoggingPattern())
.setLogfileLevel(globalOptions.getScenarioLogLevel())
.setLogfilePattern(globalOptions.getLogfileLoggingPattern())
.getLoggerLevelOverrides(globalOptions.getLogLevelOverrides())
.setLoggerLevelOverrides(globalOptions.getLogLevelOverrides())
.setMaxLogs(globalOptions.getLogsMax())
.setLogsDirectory(globalOptions.getLogsDirectory())
.setAnsiEnabled(globalOptions.isEnableAnsi())
@ -175,10 +179,10 @@ public class NBCLI implements Function<String[], Integer> {
// Invoke any bundled app which matches the name of the first non-option argument, if it exists.
// If it does not, continue with no fanfare. Let it drop through to other command resolution methods.
if (args.length>0 && args[0].matches("\\w[\\w\\d-_.]+")) {
if (args.length > 0 && args[0].matches("\\w[\\w\\d-_.]+")) {
ServiceSelector<BundledApp> apploader = ServiceSelector.of(args[0], ServiceLoader.load(BundledApp.class));
BundledApp app = apploader.get().orElse(null);
if (app!=null) {
if (app != null) {
String[] appargs = Arrays.copyOfRange(args, 1, args.length);
logger.info("invoking bundled app '" + args[0] + "' (" + app.getClass().getSimpleName() + ").");
globalOptions.setWantsStackTraces(true);
@ -211,10 +215,10 @@ public class NBCLI implements Function<String[], Integer> {
DockerMetricsManager.GRAFANA_TAG, globalOptions.getDockerGrafanaTag(),
DockerMetricsManager.PROM_TAG, globalOptions.getDockerPromTag(),
DockerMetricsManager.TSDB_RETENTION, String.valueOf(globalOptions.getDockerPromRetentionDays()),
DockerMetricsManager.GRAPHITE_SAMPLE_EXPIRY,"10m",
DockerMetricsManager.GRAPHITE_CACHE_SIZE,"5000",
DockerMetricsManager.GRAPHITE_LOG_LEVEL,globalOptions.getGraphiteLogLevel(),
DockerMetricsManager.GRAPHITE_LOG_FORMAT,"logfmt"
DockerMetricsManager.GRAPHITE_SAMPLE_EXPIRY, "10m",
DockerMetricsManager.GRAPHITE_CACHE_SIZE, "5000",
DockerMetricsManager.GRAPHITE_LOG_LEVEL, globalOptions.getGraphiteLogLevel(),
DockerMetricsManager.GRAPHITE_LOG_FORMAT, "logfmt"
);
dmh.startMetrics(dashboardOptions);
@ -262,7 +266,7 @@ public class NBCLI implements Function<String[], Integer> {
for (ServiceLoader.Provider<BundledApp> provider : loader.stream().toList()) {
Class<? extends BundledApp> appType = provider.type();
String name = appType.getAnnotation(Service.class).selector();
System.out.println(String.format("%-40s %s",name,appType.getCanonicalName()));
System.out.printf("%-40s %s%n", name, appType.getCanonicalName());
}
return EXIT_OK;
}
@ -316,25 +320,25 @@ public class NBCLI implements Function<String[], Integer> {
Path writeTo = Path.of(data.asPath().getFileName().toString());
if (Files.exists(writeTo)) {
throw new BasicError("A file named " + writeTo.toString() + " exists. Remove it first.");
throw new BasicError("A file named " + writeTo + " exists. Remove it first.");
}
try {
Files.writeString(writeTo, data.getCharBuffer(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new BasicError("Unable to write to " + writeTo.toString() + ": " + e.getMessage());
throw new BasicError("Unable to write to " + writeTo + ": " + e.getMessage());
}
logger.info("Copied internal resource '" + data.asPath() + "' to '" + writeTo.toString() + "'");
logger.info("Copied internal resource '" + data.asPath() + "' to '" + writeTo + "'");
return EXIT_OK;
}
if (options.wantsInputTypes()) {
InputType.FINDER.getAllSelectors().forEach((k,v) -> System.out.println(k + " (" + v.name() + ")"));
InputType.FINDER.getAllSelectors().forEach((k, v) -> System.out.println(k + " (" + v.name() + ")"));
return EXIT_OK;
}
if (options.wantsMarkerTypes()) {
OutputType.FINDER.getAllSelectors().forEach((k,v) -> System.out.println(k + " (" + v.name() + ")"));
OutputType.FINDER.getAllSelectors().forEach((k, v) -> System.out.println(k + " (" + v.name() + ")"));
return EXIT_OK;
}
@ -464,27 +468,27 @@ public class NBCLI implements Function<String[], Integer> {
executor.execute(scenario);
while (true) {
Optional<ScenarioResult> pendingResult = executor.getPendingResult(scenario.getScenarioName());
if (pendingResult.isEmpty()) {
LockSupport.parkNanos(100000000L);
} else {
break;
}
}
// while (true) {
// Optional<ScenarioResult> pendingResult = executor.getPendingResult(scenario.getScenarioName());
// if (pendingResult.isPresent()) {
// break;
// }
// LockSupport.parkNanos(100000000L);
// }
ScenariosResults scenariosResults = executor.awaitAllResults();
logger.debug("Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");
ActivityMetrics.closeMetrics(options.wantsEnableChart());
//scenariosResults.reportToLog();
scenariosResults.reportToLog();
ShutdownManager.shutdown();
// logger.info(scenariosResults.getExecutionSummary());
logger.info(scenariosResults.getExecutionSummary());
if (scenariosResults.hasError()) {
Exception exception = scenariosResults.getOne().getException().get();
// logger.warn(scenariosResults.getExecutionSummary());
ScenarioErrorHandler.handle(exception, options.wantsStackTraces());
logger.warn(scenariosResults.getExecutionSummary());
NBCLIErrorHandler.handle(exception, options.wantsStackTraces());
System.err.println(exception.getMessage()); // TODO: make this consistent with ConsoleLogging sequencing
return EXIT_ERROR;
} else {

View File

@ -16,16 +16,24 @@
package io.nosqlbench.engine.core.lifecycle;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ActivityExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger logger = LogManager.getLogger(ActivityExceptionHandler.class);
private final ActivityExecutor executor;
public ActivityExceptionHandler(ActivityExecutor executor) {
this.executor = executor;
logger.debug(() -> "Activity exception handler starting up for executor '" + executor + "'");
}
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception in thread '" + t.getName() + ", state[" + t.getState() + "], notifying executor '" + executor + "'");
executor.notifyException(t, e);
}
}

View File

@ -15,14 +15,14 @@
*/
package io.nosqlbench.engine.core.lifecycle;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -155,8 +155,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
}
public synchronized RuntimeException forceStopScenario(int initialMillisToWait) {
activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")");
activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")");
activity.setRunState(RunState.Stopped);
executorService.shutdown();
@ -214,23 +214,29 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
}
public boolean finishAndShutdownExecutor(int secondsToWait) {
activitylogger.debug("REQUEST STOP/before alias=(" + activity.getAlias() + ")");
activitylogger.debug("REQUEST STOP/before alias=(" + activity.getAlias() + ")");
logger.debug("Stopping executor for " + activity.getAlias() + " when work completes.");
executorService.shutdown();
boolean wasStopped = false;
try {
executorService.shutdown();
logger.trace(() -> "awaiting termination with timeout of " + secondsToWait + " seconds");
wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
logger.trace("interrupted while awaiting termination");
wasStopped = false;
logger.warn("while waiting termination of activity " + activity.getAlias() + ", " + ie.getMessage());
logger.warn("while waiting termination of shutdown " + activity.getAlias() + ", " + ie.getMessage());
activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped);
} catch (RuntimeException e) {
logger.trace("Received exception while awaiting termination: " + e.getMessage());
wasStopped = true;
stoppingException = e;
} finally {
logger.trace(() -> "finally shutting down activity " + this.getActivity().getAlias());
activity.shutdownActivity();
logger.trace("closing auto-closeables");
activity.closeAutoCloseables();
activity.setRunState(RunState.Stopped);
@ -241,6 +247,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.trace(() -> "an exception caused the activity to stop:" + stoppingException.getMessage());
throw stoppingException;
}
activitylogger.debug("REQUEST STOP/after alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped);
return wasStopped;
@ -278,11 +285,13 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
* This is the canonical way to wait for an activity to finish. It ties together
* any way that an activity can finish under one blocking call.
* This should be awaited asynchronously from the control layer in separate threads.
*
* TODO: move activity finisher threaad to this class and remove separate implementation
* <p>
* TODO: move activity finisher thread to this class and remove separate implementation
*/
public boolean awaitCompletion(int waitTime) {
logger.debug(()-> "awaiting completion of '" + this.getActivity().getAlias() + "'");
boolean finished = finishAndShutdownExecutor(waitTime);
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(startedAt, this.stoppedAt)
@ -412,7 +421,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
* Await a thread (aka motor/slot) entering a specific SlotState
*
* @param m motor instance
* @param waitTime milliseconds to wait, total
* @param waitTime milliseco`nds to wait, total
* @param pollTime polling interval between state checks
* @param desiredRunStates any desired SlotState
* @return true, if the desired SlotState was detected
@ -521,7 +530,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
}
public synchronized void notifyException(Thread t, Throwable e) {
//logger.error("Uncaught exception in activity thread forwarded to activity executor:", e);
logger.debug(() -> "Uncaught exception in activity thread forwarded to activity executor: " + e.getMessage());
this.stoppingException = new RuntimeException("Error in activity thread " + t.getName(), e);
forceStopScenario(10000);
}

View File

@ -16,7 +16,11 @@
package io.nosqlbench.engine.core.lifecycle;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ActivityFinisher extends Thread {
private final static Logger logger = LogManager.getLogger(ActivityFinisher.class);
private final ActivityExecutor executor;
private final int timeout;
@ -30,10 +34,17 @@ public class ActivityFinisher extends Thread {
@Override
public void run() {
logger.debug(this + " awaiting async completion of " + executor.getActivity().getAlias() + " on " + executor + " for timeout " + timeout);
result = executor.awaitCompletion(timeout);
logger.debug(this + " awaited async completion of " + executor.getActivity().getAlias());
}
public boolean getResult() {
return result;
}
@Override
public String toString() {
return this.getClass().getSimpleName()+"/" + executor.getActivity().getAlias();
}
}

View File

@ -17,9 +17,9 @@
package io.nosqlbench.engine.core.lifecycle;
import io.nosqlbench.api.errors.BasicError;
import org.graalvm.polyglot.PolyglotException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.PolyglotException;
import javax.script.ScriptException;
@ -32,28 +32,28 @@ import javax.script.ScriptException;
* <ol>
* <li>Report an error in the most intelligible way to the user.</li>
* </ol>
*
* <p>
* That is all. When this error handler is invoked, it is a foregone conclusion that the scenario
* is not able to continue, else the error would have been trapped and handled internal to a lower-level
* class. It is the calling exception handler's responsibility to finally shut down the scenario
* cleanly and return appropriately. Thus, <em>You should not throw errors from this class. You should only
* unwrap and explain errors, sending contents to the logfile as appropriate.</em>
*
*/
public class ScenarioErrorHandler {
public class NBCLIErrorHandler {
private final static Logger logger = LogManager.getLogger("ERRORHANDLER");
public static String handle(Throwable t, boolean wantsStackTraces) {
if (wantsStackTraces) {
StackTraceElement[] st = Thread.currentThread().getStackTrace();
for (int i = 0; i < 10; i++) {
if (st.length>i) {
if (st.length > i) {
String className = st[i].getClassName();
String fileName = st[i].getFileName();
int lineNumber = st[i].getLineNumber();
logger.trace("st["+i+"]:" + className +","+fileName+":"+lineNumber);
logger.trace("st[" + i + "]:" + className + "," + fileName + ":" + lineNumber);
}
}
}
@ -63,18 +63,18 @@ public class ScenarioErrorHandler {
} else if (t instanceof BasicError) {
logger.trace("Handling basic error: " + t);
return handleBasicError((BasicError) t, wantsStackTraces);
} else if (t instanceof Exception){
} else if (t instanceof Exception) {
logger.trace("Handling general exception: " + t);
return handleInternalError((Exception) t, wantsStackTraces);
} else {
logger.error("Unknown type for error handler: " + t);
throw new RuntimeException("Error in exception handler", t);
logger.error("Unknown type for error handler: " + t);
throw new RuntimeException("Error in exception handler", t);
}
}
private static String handleInternalError(Exception e, boolean wantsStackTraces) {
String prefix = "internal error: ";
if (e.getCause()!=null && !e.getCause().getClass().getCanonicalName().contains("io.nosqlbench")) {
if (e.getCause() != null && !e.getCause().getClass().getCanonicalName().contains("io.nosqlbench")) {
prefix = "Error from driver or included library: ";
}
@ -95,13 +95,13 @@ public class ScenarioErrorHandler {
if (cause instanceof PolyglotException) {
Throwable hostException = ((PolyglotException) cause).asHostException();
if (hostException instanceof BasicError) {
handleBasicError((BasicError)hostException, wantsStackTraces);
handleBasicError((BasicError) hostException, wantsStackTraces);
} else {
handle(hostException, wantsStackTraces);
}
} else {
if (wantsStackTraces) {
logger.error("Unknown script exception:",e);
logger.error("Unknown script exception:", e);
} else {
logger.error(e.getMessage());
logger.error("for the full stack trace, run with --show-stacktraces");
@ -112,7 +112,7 @@ public class ScenarioErrorHandler {
private static String handleBasicError(BasicError e, boolean wantsStackTraces) {
if (wantsStackTraces) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
} else {
logger.error(e.getMessage());
logger.error("for the full stack trace, run with --show-stacktraces");

View File

@ -431,6 +431,7 @@ public class ScenarioController {
* @return true, if all activities completed before the timer expired, false otherwise
*/
public boolean awaitCompletion(long waitTimeMillis) {
logger.debug(() -> "awaiting completion");
boolean completed = true;
long remaining = waitTimeMillis;
@ -443,7 +444,9 @@ public class ScenarioController {
for (ActivityFinisher finisher : finishers) {
try {
logger.debug("joining finisher " + finisher.getName());
finisher.join(waitTimeMillis);
logger.debug("joined finisher " + finisher.getName());
} catch (InterruptedException ignored) {
}
}

View File

@ -54,17 +54,19 @@ public class ScenarioResult {
private final long startedAt;
private final long endedAt;
private Exception exception;
private final Exception exception;
private final String iolog;
public ScenarioResult(String iolog, long startedAt, long endedAt) {
this.iolog = iolog;
this.startedAt = startedAt;
this.endedAt = endedAt;
}
public ScenarioResult(Exception e, long startedAt, long endedAt) {
this.iolog = e.getMessage();
public ScenarioResult(Exception e, String iolog, long startedAt, long endedAt) {
logger.debug("populating "+(e==null? "NORMAL" : "ERROR")+" scenario result");
if (logger.isDebugEnabled()) {
StackTraceElement[] st = Thread.currentThread().getStackTrace();
for (int i = 0; i < st.length; i++) {
logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName());
if (i>10) break;
}
}
this.iolog = ((iolog!=null) ? iolog + "\n\n" : "") + (e!=null? e.getMessage() : "");
this.startedAt = startedAt;
this.endedAt = endedAt;
this.exception = e;
@ -147,15 +149,14 @@ public class ScenarioResult {
StringBuilder sb = new StringBuilder();
ActivityMetrics.getMetricRegistry().getMetrics().forEach((k, v) -> {
if (v instanceof Counting) {
long count = ((Counting) v).getCount();
if (v instanceof Counting counting) {
long count = counting.getCount();
if (count > 0) {
NBMetricsSummary.summarize(sb, k, v);
}
} else if (v instanceof Gauge) {
Object value = ((Gauge) v).getValue();
if (value != null && value instanceof Number) {
Number n = (Number) value;
} else if (v instanceof Gauge<?> gauge) {
Object value = gauge.getValue();
if (value instanceof Number n) {
if (n.doubleValue() != 0) {
NBMetricsSummary.summarize(sb, k, v);
}

View File

@ -27,7 +27,6 @@ import java.util.Map;
public class ScenariosResults {
private static final Logger logger = LogManager.getLogger(ScenariosResults.class);
private final String scenariosExecutorName;
private final Map<Scenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
@ -77,4 +76,8 @@ public class ScenariosResults {
return this.scenarioResultMap.values().stream()
.anyMatch(r -> r.getException().isPresent());
}
public int getSize() {
return this.scenarioResultMap.size();
}
}

View File

@ -29,15 +29,14 @@ import org.apache.logging.log4j.core.config.ConfigurationSource;
import org.apache.logging.log4j.core.config.builder.api.*;
import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
import java.nio.file.attribute.*;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.*;
import java.util.stream.Collectors;
@ -55,10 +54,10 @@ import java.util.stream.Collectors;
public class LoggerConfig extends ConfigurationFactory {
public static Map<String, String> STANDARD_FORMATS = Map.of(
"TERSE", "%8r %-5level [%t] %-12logger{0} %msg%n%throwable",
"VERBOSE", "%d{DEFAULT}{GMT} [%t] %logger %-5level: %msg%n%throwable",
"TERSE-ANSI", "%8r %highlight{%-5level} %style{%C{1.} [%t] %-12logger{0}} %msg%n%throwable",
"VERBOSE-ANSI", "%d{DEFAULT}{GMT} [%t] %highlight{%logger %-5level}: %msg%n%throwable"
"TERSE", "%8r %-5level [%t] %-12logger{0} %msg%n%throwable",
"VERBOSE", "%d{DEFAULT}{GMT} [%t] %logger %-5level: %msg%n%throwable",
"TERSE-ANSI", "%8r %highlight{%-5level} %style{%C{1.} [%t] %-12logger{0}} %msg%n%throwable",
"VERBOSE-ANSI", "%d{DEFAULT}{GMT} [%t] %highlight{%logger %-5level}: %msg%n%throwable"
);
/**
@ -66,7 +65,7 @@ public class LoggerConfig extends ConfigurationFactory {
* we squelch them to some reasonable level so they aren't a nuisance.
*/
public static Map<String, Level> BUILTIN_OVERRIDES = Map.of(
"oshi.util", Level.INFO
"oshi.util", Level.INFO
);
/**
@ -151,20 +150,20 @@ public class LoggerConfig extends ConfigurationFactory {
builder.setStatusLevel(internalLoggingStatusThreshold);
builder.add(
builder.newFilter(
"ThresholdFilter",
Filter.Result.ACCEPT,
Filter.Result.NEUTRAL
).addAttribute("level", builderThresholdLevel)
builder.newFilter(
"ThresholdFilter",
Filter.Result.ACCEPT,
Filter.Result.NEUTRAL
).addAttribute("level", builderThresholdLevel)
);
// CONSOLE appender
AppenderComponentBuilder appenderBuilder =
builder.newAppender("console", "CONSOLE")
.addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
builder.newAppender("console", "CONSOLE")
.addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT);
appenderBuilder.add(builder.newLayout("PatternLayout")
.addAttribute("pattern", consolePattern));
.addAttribute("pattern", consolePattern));
// appenderBuilder.add(
// builder.newFilter("MarkerFilter", Filter.Result.DENY, Filter.Result.NEUTRAL)
@ -174,8 +173,8 @@ public class LoggerConfig extends ConfigurationFactory {
// Log4J internal logging
builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG)
.add(builder.newAppenderRef("console"))
.addAttribute("additivity", false));
.add(builder.newAppenderRef("console"))
.addAttribute("additivity", false));
if (sessionName != null) {
@ -189,55 +188,55 @@ public class LoggerConfig extends ConfigurationFactory {
// LOGFILE appender
LayoutComponentBuilder logfileLayout = builder.newLayout("PatternLayout")
.addAttribute("pattern", logfilePattern);
.addAttribute("pattern", logfilePattern);
String filebase = getSessionName().replaceAll("\\s", "_");
String logfilePath = loggerDir.resolve(filebase + ".log").toString();
this.logfileLocation = logfilePath;
String archivePath = loggerDir.resolve(filebase + "-TIMESTAMP.log.gz").toString()
.replaceAll("TIMESTAMP", "%d{MM-dd-yy}");
.replaceAll("TIMESTAMP", "%d{MM-dd-yy}");
ComponentBuilder triggeringPolicy = builder.newComponent("Policies")
.addComponent(builder.newComponent("CronTriggeringPolicy").addAttribute("schedule", "0 0 0 * * ?"))
.addComponent(builder.newComponent("SizeBasedTriggeringPolicy").addAttribute("size", "100M"));
.addComponent(builder.newComponent("CronTriggeringPolicy").addAttribute("schedule", "0 0 0 * * ?"))
.addComponent(builder.newComponent("SizeBasedTriggeringPolicy").addAttribute("size", "100M"));
AppenderComponentBuilder logsAppenderBuilder =
builder.newAppender("SCENARIO_APPENDER", RollingFileAppender.PLUGIN_NAME)
.addAttribute("fileName", logfilePath)
.addAttribute("filePattern", archivePath)
.addAttribute("append", false)
.add(logfileLayout)
.addComponent(triggeringPolicy);
builder.newAppender("SCENARIO_APPENDER", RollingFileAppender.PLUGIN_NAME)
.addAttribute("fileName", logfilePath)
.addAttribute("filePattern", archivePath)
.addAttribute("append", false)
.add(logfileLayout)
.addComponent(triggeringPolicy);
builder.add(logsAppenderBuilder);
rootBuilder.add(
builder.newAppenderRef("SCENARIO_APPENDER")
.addAttribute("level", fileLevel)
builder.newAppenderRef("SCENARIO_APPENDER")
.addAttribute("level", fileLevel)
);
}
rootBuilder.add(
builder.newAppenderRef("console")
.addAttribute("level",
consoleLevel
)
builder.newAppenderRef("console")
.addAttribute("level",
consoleLevel
)
);
builder.add(rootBuilder);
BUILTIN_OVERRIDES.forEach((k, v) -> {
builder.add(builder.newLogger(k, v)
.add(builder.newAppenderRef("console"))
.add(builder.newAppenderRef("SCENARIO_APPENDER"))
.addAttribute("additivity", true));
.add(builder.newAppenderRef("console"))
.add(builder.newAppenderRef("SCENARIO_APPENDER"))
.addAttribute("additivity", true));
});
logLevelOverrides.forEach((k, v) -> {
Level olevel = Level.valueOf(v);
builder.add(builder.newLogger(k, olevel)
.add(builder.newAppenderRef("console"))
.add(builder.newAppenderRef("SCENARIO_APPENDER"))
.addAttribute("additivity", true));
.add(builder.newAppenderRef("console"))
.add(builder.newAppenderRef("SCENARIO_APPENDER"))
.addAttribute("additivity", true));
});
BuiltConfiguration builtConfig = builder.build();
@ -268,7 +267,7 @@ public class LoggerConfig extends ConfigurationFactory {
if (!Files.exists(loggerDir)) {
try {
FileAttribute<Set<PosixFilePermission>> attrs = PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwxrwx---")
PosixFilePermissions.fromString("rwxrwx---")
);
Path directory = Files.createDirectory(loggerDir, attrs);
} catch (Exception e) {
@ -280,22 +279,22 @@ public class LoggerConfig extends ConfigurationFactory {
public LoggerConfig setConsolePattern(String consoleLoggingPattern) {
consoleLoggingPattern= (ansiEnabled && STANDARD_FORMATS.containsKey(consoleLoggingPattern+"-ANSI"))
? consoleLoggingPattern+"-ANSI" : consoleLoggingPattern;
consoleLoggingPattern = (ansiEnabled && STANDARD_FORMATS.containsKey(consoleLoggingPattern + "-ANSI"))
? consoleLoggingPattern + "-ANSI" : consoleLoggingPattern;
this.consolePattern = STANDARD_FORMATS.getOrDefault(consoleLoggingPattern, consoleLoggingPattern);
return this;
}
public LoggerConfig setLogfilePattern(String logfileLoggingPattern) {
logfileLoggingPattern= (logfileLoggingPattern.endsWith("-ANSI") && STANDARD_FORMATS.containsKey(logfileLoggingPattern))
? logfileLoggingPattern.substring(logfileLoggingPattern.length()-5) : logfileLoggingPattern;
logfileLoggingPattern = (logfileLoggingPattern.endsWith("-ANSI") && STANDARD_FORMATS.containsKey(logfileLoggingPattern))
? logfileLoggingPattern.substring(logfileLoggingPattern.length() - 5) : logfileLoggingPattern;
this.logfileLocation = STANDARD_FORMATS.getOrDefault(logfileLoggingPattern, logfileLoggingPattern);
return this;
}
public LoggerConfig getLoggerLevelOverrides(Map<String, String> logLevelOverrides) {
public LoggerConfig setLoggerLevelOverrides(Map<String, String> logLevelOverrides) {
this.logLevelOverrides = logLevelOverrides;
return this;
}
@ -334,9 +333,9 @@ public class LoggerConfig extends ConfigurationFactory {
}
List<File> toDelete = filesList.stream()
.sorted(fileTimeComparator)
.limit(remove)
.collect(Collectors.toList());
.sorted(fileTimeComparator)
.limit(remove)
.collect(Collectors.toList());
for (File file : toDelete) {
logger.info("removing extra logfile: " + file.getPath());

View File

@ -17,8 +17,13 @@ package io.nosqlbench.engine.core.script;
import com.codahale.metrics.MetricRegistry;
import com.oracle.truffle.js.scriptengine.GraalJSScriptEngine;
import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.ScenarioMetadataAware;
import io.nosqlbench.api.metadata.SystemId;
import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ActivityProgressIndicator;
@ -27,14 +32,12 @@ import io.nosqlbench.engine.core.lifecycle.ScenarioController;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.metrics.PolyglotMetricRegistryBindings;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.ScenarioMetadataAware;
import io.nosqlbench.api.metadata.SystemId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.*;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.EnvironmentAccess;
import org.graalvm.polyglot.HostAccess;
import org.graalvm.polyglot.PolyglotAccess;
import javax.script.Compilable;
import javax.script.CompiledScript;
@ -68,6 +71,12 @@ public class Scenario implements Callable<ScenarioResult> {
private Exception error;
private ScenarioMetadata scenarioMetadata;
private ScenarioResult result;
public Optional<ScenarioResult> getResultIfComplete() {
return Optional.ofNullable(this.result);
}
public enum State {
Scheduled,
@ -162,10 +171,9 @@ public class Scenario implements Callable<ScenarioResult> {
return this;
}
private void init() {
private void initializeScriptingEngine() {
logger.debug("Using engine " + engine.toString());
MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
Context.Builder contextSettings = Context.newBuilder("js")
@ -183,7 +191,7 @@ public class Scenario implements Callable<ScenarioResult> {
.option("js.nashorn-compat", "true");
org.graalvm.polyglot.Engine.Builder engineBuilder = org.graalvm.polyglot.Engine.newBuilder();
engineBuilder.option("engine.WarnInterpreterOnly","false");
engineBuilder.option("engine.WarnInterpreterOnly", "false");
org.graalvm.polyglot.Engine polyglotEngine = engineBuilder.build();
// TODO: add in, out, err for this scenario
@ -205,9 +213,9 @@ public class Scenario implements Callable<ScenarioResult> {
// scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
// scriptEngine.put("activities", new NashornActivityBindings(scenarioController));
scriptEngine.put("scenario", new PolyglotScenarioController(scenarioController));
scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
scriptEngine.put("activities", new NashornActivityBindings(scenarioController));
scriptEngine.put("scenario", new PolyglotScenarioController(scenarioController));
scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
scriptEngine.put("activities", new NashornActivityBindings(scenarioController));
for (ScriptingPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
if (!extensionDescriptor.isAutoLoading()) {
@ -241,12 +249,11 @@ public class Scenario implements Callable<ScenarioResult> {
return scenarioMetadata;
}
public void runScenario() {
private synchronized void runScenario() {
scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(scenarioShutdownHook);
state = State.Running;
startedAtMillis = System.currentTimeMillis();
Annotators.recordAnnotation(
Annotation.newBuilder()
@ -256,21 +263,21 @@ public class Scenario implements Callable<ScenarioResult> {
.detail("engine", this.engine.toString())
.build()
);
init();
initializeScriptingEngine();
logger.debug("Running control script for " + getScenarioName() + ".");
for (String script : scripts) {
try {
Object result = null;
if (scriptEngine instanceof Compilable && wantsCompiledScript) {
if (scriptEngine instanceof Compilable compilableEngine && wantsCompiledScript) {
logger.debug("Using direct script compilation");
Compilable compilableEngine = (Compilable) scriptEngine;
CompiledScript compiled = compilableEngine.compile(script);
logger.debug("-> invoking main scenario script (compiled)");
result = compiled.eval();
logger.debug("<- scenario script completed (compiled)");
} else {
if (scriptfile != null && !scriptfile.isEmpty()) {
String filename = scriptfile.replace("_SESSION_", scenarioName);
logger.debug("-> invoking main scenario script (" +
"interpreted from " + filename + ")");
@ -292,16 +299,21 @@ public class Scenario implements Callable<ScenarioResult> {
}
if (result != null) {
logger.debug("scenario result: type(" + result.getClass().getCanonicalName() + "): value:" + result.toString());
logger.debug("scenario result: type(" + result.getClass().getCanonicalName() + "): value:" + result);
}
System.err.flush();
System.out.flush();
} catch (Exception e) {
this.state = State.Errored;
logger.error("Error in scenario, shutting down. (" + e.toString() + ")");
this.scenarioController.forceStopScenario(5000, false);
this.error = e;
throw new RuntimeException(e);
logger.error("Error in scenario, shutting down. (" + e + ")");
try {
this.scenarioController.forceStopScenario(5000, false);
} catch (Exception eInner) {
logger.debug("Found inner exception while forcing stop with rethrow=false: " + eInner);
} finally {
this.error = e;
throw new RuntimeException(e);
}
} finally {
System.out.flush();
System.err.flush();
@ -355,14 +367,29 @@ public class Scenario implements Callable<ScenarioResult> {
return endedAtMillis;
}
public ScenarioResult call() {
runScenario();
String iolog = scriptEnv.getTimedLog();
ScenarioResult result = new ScenarioResult(iolog, this.startedAtMillis, this.endedAtMillis);
/**
* This should be the only way to get a ScenarioResult for a Scenario.
*
* @return
*/
public synchronized ScenarioResult call() {
if (result == null) {
try {
runScenario();
} catch (Exception e) {
if (this.error!=null) {
logger.debug("OVERLAPPING ERRORS: prior" + this.error.getMessage() + ", current:" + e.getMessage());
}
this.error = e;
} finally {
logger.debug((this.error == null ? "NORMAL" : "ERRORED") + " scenario run");
}
result.reportToLog();
doReportSummaries(reportSummaryTo, result);
String iolog = scriptEnv.getTimedLog();
this.result = new ScenarioResult(this.error, iolog, this.startedAtMillis, this.endedAtMillis);
result.reportToLog();
doReportSummaries(reportSummaryTo, result);
}
return result;
}

View File

@ -16,10 +16,7 @@
package io.nosqlbench.engine.core.script;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.engine.core.lifecycle.ScenarioController;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.ScenariosResults;
import io.nosqlbench.engine.core.lifecycle.*;
import io.nosqlbench.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -43,9 +40,9 @@ public class ScenariosExecutor {
public ScenariosExecutor(String name, int threads) {
executor = new ThreadPoolExecutor(1, threads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new IndexedThreadFactory("scenarios", new ScenarioExceptionHandler(this)));
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new IndexedThreadFactory("scenarios", new ScenarioExceptionHandler(this)));
this.name = name;
}
@ -92,7 +89,6 @@ public class ScenariosExecutor {
long waitedAt = System.currentTimeMillis();
long updateAt = Math.min(timeoutAt, waitedAt + updateInterval);
while (!isShutdown && System.currentTimeMillis() < timeoutAt) {
while (!isShutdown && System.currentTimeMillis() < updateAt) {
try {
long timeRemaining = updateAt - System.currentTimeMillis();
@ -108,11 +104,17 @@ public class ScenariosExecutor {
if (!isShutdown) {
throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
}
Map<Scenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
getAsyncResultStatus()
.entrySet().forEach(es -> scenarioResultMap.put(es.getKey(), es.getValue().orElse(null)));
.entrySet()
.forEach(
es -> scenarioResultMap.put(
es.getKey(),
es.getValue().orElse(null)
)
);
return new ScenariosResults(this, scenarioResultMap);
}
@ -121,9 +123,9 @@ public class ScenariosExecutor {
*/
public List<String> getPendingScenarios() {
return new ArrayList<>(
submitted.values().stream()
.map(SubmittedScenario::getName)
.collect(Collectors.toCollection(ArrayList::new)));
submitted.values().stream()
.map(SubmittedScenario::getName)
.collect(Collectors.toCollection(ArrayList::new)));
}
/**
@ -149,7 +151,8 @@ public class ScenariosExecutor {
oResult = Optional.of(resultFuture.get());
} catch (Exception e) {
long now = System.currentTimeMillis();
oResult = Optional.of(new ScenarioResult(e, now, now));
logger.debug("creating exceptional scenario result from getAsyncResultStatus");
oResult = Optional.of(new ScenarioResult(e, "errored output", now, now));
}
}
@ -176,23 +179,8 @@ public class ScenariosExecutor {
* @param scenarioName the scenario name of interest
* @return an optional result
*/
public Optional<ScenarioResult> getPendingResult(String scenarioName) {
Future<ScenarioResult> resultFuture1 = submitted.get(scenarioName).resultFuture;
if (resultFuture1 == null) {
throw new BasicError("Unknown scenario name:" + scenarioName);
}
long now = System.currentTimeMillis();
if (resultFuture1.isDone()) {
try {
return Optional.ofNullable(resultFuture1.get());
} catch (Exception e) {
return Optional.of(new ScenarioResult(e, now, now));
}
} else if (resultFuture1.isCancelled()) {
return Optional.of(new ScenarioResult(new Exception("result was cancelled."), now, now));
}
return Optional.empty();
public Optional<Future<ScenarioResult>> getPendingResult(String scenarioName) {
return Optional.ofNullable(submitted.get(scenarioName)).map(s -> s.resultFuture);
}
public synchronized void stopScenario(String scenarioName) {
@ -200,6 +188,7 @@ public class ScenariosExecutor {
}
public synchronized void stopScenario(String scenarioName, boolean rethrow) {
logger.debug("#stopScenario(name=" + scenarioName + ", rethrow="+ rethrow+")");
Optional<Scenario> pendingScenario = getPendingScenario(scenarioName);
if (pendingScenario.isPresent()) {
ScenarioController controller = pendingScenario.get().getScenarioController();
@ -256,6 +245,7 @@ public class ScenariosExecutor {
}
public synchronized void notifyException(Thread t, Throwable e) {
logger.debug(() -> "Scenario executor uncaught exception: " + e.getMessage());
this.stoppingException = new RuntimeException("Error in scenario thread " + t.getName(), e);
}

View File

@ -19,19 +19,27 @@ package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.script.Scenario;
import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ScenarioTest {
private final Logger logger = LogManager.getLogger(ScenarioTest.class);
@Test
public void shouldLoadScriptText() {
ScriptEnvBuffer buffer = new ScriptEnvBuffer();
Scenario env = new Scenario("testing", Scenario.Engine.Graalvm, "stdout:300", Maturity.Any);
env.addScriptText("print('loaded script environment...');\n");
env.runScenario();
assertThat(env.getIOLog().get().get(0)).contains("loaded script environment...");
Scenario scenario = new Scenario("testing", Scenario.Engine.Graalvm, "stdout:300", Maturity.Any);
scenario.addScriptText("print('loaded script environment...');\n");
try {
var result=scenario.call();
} catch (Exception e) {
logger.debug("Scenario run encountered an exception: " + e.getMessage());
}
assertThat(scenario.getIOLog().get().get(0)).contains("loaded script environment...");
}
}

View File

@ -23,13 +23,13 @@
<dependency>
<groupId>io.swagger.parser.v3</groupId>
<artifactId>swagger-parser</artifactId>
<version>2.1.4</version>
<version>2.1.7</version>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-models</artifactId>
<version>2.2.3</version>
<version>2.2.4</version>
</dependency>
<dependency>

View File

@ -41,6 +41,7 @@ import org.joda.time.format.DateTimeFormat;
import java.io.CharArrayWriter;
import java.io.PrintWriter;
import java.util.*;
import java.util.concurrent.Future;
@Service(value = WebServiceObject.class, selector = "scenario-executor")
@Singleton
@ -233,8 +234,9 @@ public class ScenarioExecutorEndpoint implements WebServiceObject {
Optional<Scenario> pendingScenario = executor.getPendingScenario(scenarioName);
if (pendingScenario.isPresent()) {
Optional<ScenarioResult> pendingResult = executor.getPendingResult(scenarioName);
return new LiveScenarioView(pendingScenario.get(), pendingResult.orElse(null));
Optional<Future<ScenarioResult>> pendingResult = executor.getPendingResult(scenarioName);
Future<ScenarioResult> scenarioResultFuture = pendingResult.get();
return new LiveScenarioView(pendingScenario.get());
} else {
throw new RuntimeException("Scenario name '" + scenarioName + "' not found.");
}

View File

@ -19,7 +19,6 @@ package io.nosqlbench.engine.rest.transfertypes;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.script.Scenario;
import java.util.ArrayList;
@ -29,21 +28,16 @@ import java.util.List;
public class LiveScenarioView {
private final Scenario scenario;
private final ScenarioResult result;
public LiveScenarioView(Scenario scenario, ScenarioResult result) {
public LiveScenarioView(Scenario scenario) {
this.scenario = scenario;
this.result = result;
}
@JsonProperty
@JsonPropertyDescription("Optionally populated result, "+
" present only if there was an error or the scenario is complete")
public ResultView getResult() {
if (result==null) {
return null;
}
return new ResultView(result);
return new ResultView(scenario.getResultIfComplete().orElse(null));
}
@JsonProperty("scenario_name")

View File

@ -27,14 +27,17 @@ public class ResultView {
}
public String getIOLog() {
return result.getIOLog();
}
public String getError() {
if (result.getException().isPresent()) {
return result.getException().get().getMessage();
if (result!=null) {
return result.getIOLog();
} else {
return "";
}
}
public String getError() {
if (result!=null && result.getException().isPresent()) {
return result.getException().get().getMessage();
}
return "";
}
}

View File

@ -98,7 +98,7 @@
<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
<version>4.6.3</version>
<version>4.7.0</version>
</dependency>
<!-- due to https://github.com/oshi/oshi/issues/1289 -->
@ -135,7 +135,7 @@
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
<version>4.2.10</version>
<version>4.2.12</version>
</dependency>
<dependency>
@ -189,7 +189,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.82.Final</version>
<version>4.1.84.Final</version>
</dependency>
<dependency>
@ -265,13 +265,13 @@
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core-java11</artifactId>
<version>6.2.2</version>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.1</version>
<version>2.10</version>
</dependency>
<dependency>
@ -301,7 +301,7 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.11.2</version>
<version>2.12.1</version>
</dependency>
<dependency>
@ -360,23 +360,23 @@
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<version>21.3.3.1</version>
<version>21.3.4</version>
</dependency>
<dependency>
<groupId>org.graalvm.js</groupId>
<artifactId>js</artifactId>
<version>21.3.3.1</version>
<version>21.3.4</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.graalvm.js</groupId>
<artifactId>js-scriptengine</artifactId>
<version>21.3.3.1</version>
<version>21.3.4</version>
</dependency>
<dependency>
<groupId>org.graalvm.tools</groupId>
<artifactId>profiler</artifactId>
<version>20.3.6.1</version>
<version>20.3.8</version>
<scope>runtime</scope>
</dependency>
<dependency>

View File

@ -97,7 +97,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.330</version>
<version>1.12.340</version>
</dependency>
<dependency>

View File

@ -57,28 +57,28 @@ class ExitStatusIntegrationTests {
assertThat(result.exitStatus).isEqualTo(2);
}
// Temporarily disabled for triage
// TODO: figure out if github actions is an issue for this test.
// It passes locally, but fails spuriously in github actions runner
// @Test
// public void testExitStatusOnActivityThreadException() {
// ProcessInvoker invoker = new ProcessInvoker();
// invoker.setLogDir("logs/test");
// ProcessResult result = invoker.run("exitstatus_threadexception", 30,
// "java", "-jar", JARNAME, "--logs-dir", "logs/test", "run", "driver=diag", "throwoncycle=10", "cycles=1000", "cyclerate=10", "-vvv"
// );
// String stdout = result.getStdoutData().stream().collect(Collectors.joining("\n"));
// assertThat(stdout).contains("Diag was asked to throw an error on cycle 10");
// assertThat(result.exitStatus).isEqualTo(2);
// }
@Test
void testExitStatusOnActivityBasicCommandException() {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
// Forcing a thread exception via basic command issue.
ProcessResult result = invoker.run("exitstatus_threadexception", 30,
"java", "-jar", JARNAME, "--logs-dir", "logs/test/threadexcep", "--logs-level", "debug", "run",
"driver=diag", "cyclerate=10", "not_a_thing", "cycles=100", "-vvv"
);
String stdout = String.join("\n", result.getStdoutData());
assertThat(stdout).contains("Could not recognize command");
assertThat(result.exitStatus).isEqualTo(2);
}
@Test
void testExitStatusOnActivityOpException() {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
ProcessResult result = invoker.run("exitstatus_asyncstoprequest", 30,
java, "-jar", JARNAME, "--logs-dir", "logs/test/asyncstop", "run",
"driver=diag", "cyclerate=1", "op=erroroncycle:erroroncycle=10", "cycles=2000", "-vvv"
"java", "-jar", JARNAME, "--logs-dir", "logs/test/asyncstop", "--logs-level", "debug", "run",
"driver=diag", "threads=2", "cyclerate=10", "op=erroroncycle:erroroncycle=10", "cycles=500", "-vvv"
);
assertThat(result.exception).isNull();
String stdout = String.join("\n", result.getStdoutData());
@ -86,5 +86,18 @@ class ExitStatusIntegrationTests {
assertThat(result.exitStatus).isEqualTo(2);
}
// This will not work reliablyl until the activity shutdown bug is fixed.
// @Test
// public void testCloseErrorHandlerOnSpace() {
// ProcessInvoker invoker = new ProcessInvoker();
// invoker.setLogDir("logs/test");
// ProcessResult result = invoker.run("exitstatus_erroronclose", 30,
// java, "-jar", JARNAME, "--logs-dir", "logs/test/error_on_close", "run",
// "driver=diag", "threads=2", "rate=5", "op=noop", "cycles=10", "erroronclose=true", "-vvv"
// );
// String stdout = String.join("\n", result.getStdoutData());
// String stderr = String.join("\n", result.getStderrData());
// assertThat(result.exception).isNotNull();
// assertThat(result.exception.getMessage()).contains("diag space was configured to throw");
// }
}

View File

@ -16,10 +16,16 @@
package io.nosqlbench.cli.testing;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.concurrent.TimeUnit;
public class ProcessInvoker {
private static final Logger logger = LogManager.getLogger(ProcessInvoker.class);
private File runDirectory = new File(".");
private File logDirectory = new File(".");
@ -49,13 +55,17 @@ public class ProcessInvoker {
try {
result.cmdDir = new File(".").getCanonicalPath();
process = pb.start();
var handle = process.toHandle();
boolean terminated = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!terminated) {
process.destroyForcibly().waitFor();
result.exception = new RuntimeException("timed out waiting for process, so it was shutdown forcibly.");
}
} catch (Exception e) {
if (process != null) {
logger.debug("Exception received, with exit value: " + process.exitValue());
}
result.exception = e;
} finally {
result.startNanosTime = startNanosTime;
@ -66,7 +76,7 @@ public class ProcessInvoker {
if (process != null) {
result.exitStatus = process.exitValue();
} else {
result.exitStatus=255;
result.exitStatus = 255;
}
}
return result;