merge nb4-rc1 to main

This commit is contained in:
Jonathan Shook 2020-12-11 14:50:21 -06:00
commit 86624c5b53
91 changed files with 5077 additions and 8917 deletions

View File

@ -16,7 +16,7 @@ jobs:
- name: setup java
uses: actions/setup-java@v1
with:
java-version: '14'
java-version: '15'
java-package: jdk
architecture: x64

View File

@ -1,4 +1,4 @@
FROM openjdk:14-alpine
FROM openjdk:15-alpine
RUN apk --no-cache add curl
COPY nb/target/nb.jar nb.jar

View File

@ -0,0 +1,8 @@
- 526d09cd (HEAD -> nb4-rc1) auto create dirs for grafana_apikey
- b4ec4c9a (origin/nb4-rc1) trigger build
- af87ef9c relaxed requirement for finicky test
- 3436ec61 trigger build
- 17ed4c1e annotator and dashboard fixes
- 4dab9b89 move annotations enums to package
- 6d514cb6 bump middle version number to required java version '15'
- fa78e27f set NB4 to Java 15

View File

@ -0,0 +1,244 @@
## RateLimiter Design
The nosqlbench rate limiter is a hybrid design, combining ideas from
well-known algorithms with a heavy dose of mechanical sympathy. The
resulting implementation provides the following:
1. A basic design that can be explained in one page (this page!)
2. High throughput, compared to other rate limiters tested.
3. Graceful degradation with increasing concurrency.
4. Clearly defined behavioral semantics.
5. Efficient burst capability, for tunable catch-up rates.
6. Efficient calculation of wait time.
## Parameters
**rate** - In simplest terms, users simply need to configure the *rate*.
For example, `rate=12000` specifies an op rate of 12000 ops/second.
**burst rate** - Additionally, users may specify a burst rate which can be
used to recover unused time when a client is able to go faster than the
strict limit. The burst rate is multiplied by the _op rate_ to arrive at
the maximum rate when wait time is available to recover. For
example, `rate=12000,1.1`
specifies that a client may operate at 12000 ops/s _when it is caught up_,
while allowing it to go at a rate of up to 13200 ops/s _when it is behind
schedule_.
## Design Principles
The core design of the rate limiter is based on
the [token bucket](https://en.wikipedia.org/wiki/Token_bucket) algorithm
as established in the telecom industry for rate metering. Additional
refinements have been added to allow for flexible and reliable use on
non-realtime systems.
The unit of scheduling used in this design is the token, corresponding
directly to a nanosecond of time. The scheduling time that is made
available to callers is stored in a pool of tokens which is set to a
configured size. The size of the token pool determines how many grants are
allowed to be dispatched before the next one is forced to wait for
available tokens.
At some regular frequency, a filler thread adds tokens (nanoseconds of
time to be distributed to waiting ops) to the pool. The callers which are
waiting for these tokens consume a number of tokens serially. If the pool
does not contain the requested number of tokens, then the caller is
blocked using basic synchronization primitives. When the pool is filled
any blocked callers are unblocked.
The hybrid rate limiter tracks and accumulates both the passage of system
time and the usage rate of this time as a measurement of progress. The
delta between these two reference points in time captures a very simple
and empirical value of imposed wait time.
That is, the time which was allocated but which was not used always
represents a slow down which is imposed by external factors. This
manifests as slower response when considering the target rate to be
equivalent to user load.
## Design Details
In fact, there are three pools. The _active_ pool, the _bursting_ pool,
and the
_waiting_ pool. The active pool has a limited size based on the number of
operations that are allowed to be granted concurrently.
The bursting pool is sized according to the relative burst rate and the
size of the active pool. For example, with an op rate of 1000 ops/s and a
burst rate of 1.1, the active pool can be sized to 1E9 nanos (one second
of nanos), and the burst pool can be sized to 1E8 (1/10 of that), thus
yielding a combined pool size of 1E9 + 1E8, or 1100000000 ns.
The waiting pool is where all extra tokens are held in reserve. It is
unlimited except by the size of a long value. The size of the waiting pool
is a direct measure of wait time in nanoseconds.
Within the pools, tokens (time) are neither created nor destroyed. They
are added by the filler based on the passage of time, and consumed by
callers when they become available. In between these operations, the net
sum of tokens is preserved. In short, when time deltas are observed in the
system clock, this time is accumulated into the available scheduling time
of the token pools. In this way, the token pool acts as a metered
dispenser of scheduling time to waiting (or not) consumers.
The filler thread adds tokens to the pool according to the system
real-time clock, at some estimated but unreliable interval. The frequency
of filling is set high enough to give a reliable perception of time
passing smoothly, but low enough to avoid wasting too much thread time in
calling overhead. (It is set to 1K/s by default). Each time filling
occurs, the real-time clock is check-pointed, and the time delta is fed
into the pool filling logic as explained below.
## Visual Explanation
The diagram below explains the moving parts of the hybrid rate limiter.
The arrows represent the flow of tokens (ns) as a form of scheduling
currency.
The top box shows an active token filler thread which polls the system
clock and accumulates new time into the token pool.
The bottom boxes represent concurrent readers of the token pool. These are
typically independent threads which do a blocking read for tokens once
they are ready to execute the rate-limited task.
![Hybrid Ratelimiter Schematic](hybrid_ratelimiter.png)
In the middle, the passive component in this diagram is the token pool
itself. When the token filler adds tokens, it never blocks. However, the
token filler can cause any readers of the token pool to unblock so that
they can acquire newly available tokens.
When time is added to the token pool, the following steps are taken:
1) New tokens (based on measured time elapsed since the last fill) are
added to the active pool until it is full.
2) Any extra tokens are added to the waiting pool.
3) If the waiting pool has any tokens, and there is room in the bursting
pool, some tokens are moved from the waiting pool to the bursting pool
according to how many will fit.
When a caller asks for a number of tokens, the combined total from the
active and burst pools is available to that caller. If the number of
tokens needed is not yet available, then the caller will block until
tokens are added.
## Bursting Logic
Tokens in the waiting pool represent time that has not been claimed by a
caller. Tokens accumulate in the waiting pool as a side-effect of
continuous filling outpacing continuous draining, thus creating a backlog
of operations.
The pool sizes determine both the maximum instantaneously available
operations as well as the rate at which unclaimed time can be back-filled
back into the active or burst pools.
### Normalizing for Jitter
Since it is not possible to schedule the filler thread to trigger on a
strict and reliable schedule (as in a real-time system), the method of
moving tokens from the waiting pool to the bursting pool must account for
differences in timing. Thus, tokens which are activated for bursting are
scaled according to the amount of time added in the last fill, relative to
the maximum active pool. This means that a full pool fill will allow a
full burst pool fill, presuming wait time is positive by that amount. It
also means that the same effect can be achieved by ten consecutive fills
of a tenth the time each. In effect, bursting is normalized to the passage
of time along with the burst rate, with a maximum cap imposed when
operations are unclaimed by callers.
## Mechanical Trade-offs
In this implementation, it is relatively easy to explain how accuracy and
performance trade-off. They are competing concerns. Consider these two
extremes of an isochronous configuration:
### Slow Isochronous
For example, the rate limiter could be configured for strict isochronous
behavior by setting the active pool size to *one* op of nanos and the
burst rate to 1.0, thus disabling bursting. If the op rate requested is 1
op/s, this configuration will work relatively well, although *any* caller
which doesn't show up (or isn't already waiting) when the tokens become
available will incur a waittime penalty. The odds of this are relatively
low for a high-velocity client.
### Fast Isochronous
However, if the op rate for this type of configuration is set to 1E8
operations per second, then the filler thread will be adding 100 ops worth
of time when there is only *one* op worth of active pool space. This is
due to the fact that filling can only occur at a maximal frequency which
has been set to 1K fills/s on average. That will create artificial wait
time, since the token consumers and producers would not have enough pool
space to hold the tokens needed during fill. It is not possible on most
systems to fill the pool at arbitrarily high fill frequencies. Thus, it is
important for users to understand the limits of the machinery when using
high rates. In most scenarios, these limits will not be onerous.
### Boundary Rules
Taking these effects into account, the default configuration makes some
reasonable trade-offs according to the rules below. These rules should
work well for most rates below 50M ops/s. The net effect of these rules is
to increase work bulking within the token pools as rates go higher.
Trying to go above 50M ops/s while also forcing isochronous behavior will
result in artificial wait-time. For this reason, the pool size itself is
not user-configurable at this time.
- The pool size will always be at least as big as two ops. This rule
ensures that there is adequate buffer space for tokens when callers are
accessing the token pools near the rate of the filler thread. If this
were not ensured, then artificial wait time would be injected due to
overflow error.
- The pool size will always be at least as big as 1E6 nanos, or 1/1000 of
a second. This rule ensures that the filler thread has a reasonably
attainable update frequency which will prevent underflow in the active
or burst pools.
- The number of ops that can fit in the pool will determine how many ops
can be dispatched between fills. For example, an op rate of 1E6 will
mean that up to 1000 ops worth of tokens may be present between fills,
and up to 1000 ops may be allowed to start at any time before the next
fill.
.1 ops/s : .2 seconds worth 1 ops/s : 2 seconds worth 100 ops/s : 2
seconds worth
In practical terms, this means that rates slower than 1K ops/S will have
their strictness controlled by the burst rate in general, and rates faster
than 1K ops/S will automatically include some op bulking between fills.
## History
A CAS-oriented method which compensated for RTC calling overhead was used
previously. This method afforded very high performance, but it was
difficult to reason about.
This implementation replaces that previous version. Basic synchronization
primitives (implicit locking via synchronized methods) performed
surprisingly well -- well enough to discard the complexity of the previous
implementation.
Further, this version is much easier to study and reason about.
## New Challenges
While the current implementation works well for most basic cases, high CPU
contention has shown that it can become an artificial bottleneck. Based on
observations on higher end systems with many cores running many threads
and high target rates, it appears that the rate limiter becomes a resource
blocker or forces too much thread management.
Strategies for handling this should be considered:
1) Make callers able to pseudo-randomly (or not randomly) act as a token
filler, such that active consumers can do some work stealing from the
original token filler thread.
2) Analyze the timing and history of a high-contention scenario for
weaknesses in the parameter adjustment rules above.
3) Add internal micro-batching at the consumer interface, such that
contention cost is lower in general.
4) Partition the rate limiter into multiple slices.

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

File diff suppressed because it is too large Load Diff

After

Width:  |  Height:  |  Size: 143 KiB

View File

@ -9,7 +9,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -18,7 +18,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
@ -98,7 +98,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -34,11 +34,6 @@ public class CqlActivityType implements ActivityType<CqlActivity> {
throw new RuntimeException("Currently, the cql activity type requires yaml/workload activity parameter.");
}
// allow shortcut: yaml parameter provide the default alias name
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
activityDef.getParams().set("alias",yaml.get());
}
return new CqlActivity(activityDef);
}

View File

@ -3,7 +3,6 @@ package io.nosqlbench.activitytype.cqld4.core;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
import io.nosqlbench.activitytype.cqld4.codecsupport.UDTJavaType;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
@ -35,11 +34,6 @@ public class CqlActivityType implements ActivityType<CqlActivity> {
throw new RuntimeException("Currently, the cql activity type requires yaml/workload activity parameter.");
}
// allow shortcut: yaml parameter provide the default alias name
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
activityDef.getParams().set("alias",yaml.get());
}
return new CqlActivity(activityDef);
}

View File

@ -79,7 +79,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
@Override
public void startOpCycle(TrackedOp<CqlOpData> opc) {
CqlOpData cqlop = opc.getData();
CqlOpData cqlop = opc.getOpData();
long cycle = opc.getCycle();
// bind timer covers all statement selection and binding, skipping, transforming logic
@ -123,7 +123,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
public void onSuccess(StartedOp<CqlOpData> sop, AsyncResultSet resultSet) {
CqlOpData cqlop = sop.getData();
CqlOpData cqlop = sop.getOpData();
HashedCQLErrorHandler.resetThreadStatusCode();
if (cqlop.skipped) {
@ -218,7 +218,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
public void onFailure(StartedOp<CqlOpData> startedOp) {
CqlOpData cqlop = startedOp.getData();
CqlOpData cqlop = startedOp.getOpData();
long serviceTime = startedOp.getCurrentServiceTimeNanos();
// Even if this is retryable, we expose error events

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -24,13 +24,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cql-shaded</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,13 +20,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,11 +7,10 @@ import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.virtdata.core.templates.StringBindings;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.io.FileNotFoundException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
@ -28,14 +27,11 @@ public class HttpAction implements SyncAction {
private final HttpActivity httpActivity;
private final int slot;
private final int maxTries = 1;
private boolean showstmts;
private OpSequence<ReadyHttpOp> sequencer;
private HttpClient client;
private final HttpResponse.BodyHandler<String> bodyreader = HttpResponse.BodyHandlers.ofString();
private final long timeoutMillis=30000L;
public HttpAction(ActivityDef activityDef, int slot, HttpActivity httpActivity) {
this.slot = slot;
@ -54,25 +50,15 @@ public class HttpAction implements SyncAction {
@Override
public int runCycle(long cycleValue) {
StringBindings stringBindings;
String statement = null;
InputStream result = null;
// The request to be used must be constructed from the template each time.
HttpOp httpOp = null;
// The bind timer captures all the time involved in preparing the
// operation for execution, including data generation as well as
// op construction
// The request to be used must be constructed from the template each time.
HttpOp httpOp=null;
// A specifier for what makes a response ok. If this is provided, then it is
// either a list of valid http status codes, or if non-numeric, a regex for the body
// which must match.
// If not provided, then status code 200 is the only thing required to be matched.
String ok;
try (Timer.Context bindTime = httpActivity.bindTimer.time()) {
ReadyHttpOp readHTTPOperation = httpActivity.getSequencer().get(cycleValue);
ReadyHttpOp readHTTPOperation = sequencer.get(cycleValue);
httpOp = readHTTPOperation.apply(cycleValue);
} catch (Exception e) {
if (httpActivity.isDiagnosticMode()) {
@ -135,98 +121,10 @@ public class HttpAction implements SyncAction {
}
}
// if (ok == null) {
// if (response.statusCode() != 200) {
// throw new ResponseError("Result had status code " +
// response.statusCode() + ", but 'ok' was not set for this statement," +
// "so it is considered an error.");
// }
// } else {
// String[] oks = ok.split(",");
// for (String ok_condition : oks) {
// if (ok_condition.charAt(0)>='0' && ok_condition.charAt(0)<='9') {
// int matching_status = Integer.parseInt(ok_condition);
// } else {
// Pattern successRegex = Pattern.compile(ok);
// }
// }
//// Matcher matcher = successRegex.matcher(String.valueOf(response.statusCode()));
//// if (!matcher.matches()) {
//// throw new BasicError("status code " + response.statusCode() + " did not match " + success);
//// }
// }
}
return 0;
}
// String body = future.body();
// String[] splitStatement = statement.split("\\?");
// String path, query;
//
// path = splitStatement[0];
// query = "";
//
// if (splitStatement.length >= 2) {
// query = splitStatement[1];
// }
//
// URI uri = new URI(
// "http",
// null,
// host,
// httpActivity.getPort(),
// path,
// query,
// null);
//
// statement = uri.toString();
//
// showstmts = httpActivity.getShowstmts();
// if (showstmts) {
// logger.info("STMT(cycle=" + cycleValue + "):\n" + statement);
// }
// } catch (URISyntaxException e) {
// e.printStackTrace();
// }
//
// long nanoStartTime=System.nanoTime();
//
// Timer.Context resultTime = httpActivity.resultTimer.time();
// try {
// StringBuilder res = new StringBuilder();
//
// BufferedReader rd = new BufferedReader(new InputStreamReader(result));
// String line;
// while ((line = rd.readLine()) != null) {
// res.append(line);
// }
// rd.close();
//
// } catch (Exception e) {
// long resultNanos = resultTime.stop();
// resultTime = null;
// } finally {
// if (resultTime != null) {
// resultTime.stop();
// }
//
// }
//
// }
// long resultNanos = System.nanoTime() - nanoStartTime;
// httpActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
// protected HttpActivity getHttpActivity () {
// return httpActivity;
// }
// }
private HttpRequest.BodyPublisher bodySourceFrom(Map<String, String> cmdMap) {
if (cmdMap.containsKey("body")) {
String body = cmdMap.remove("body");

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -44,13 +44,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -21,13 +21,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -27,11 +27,6 @@ public class StdoutActivityType implements ActivityType<StdoutActivity> {
".");
}
// allow shortcut: yaml parameter provide the default alias name
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
activityDef.getParams().set("alias",yaml.get());
}
return new StdoutActivity(activityDef);
}

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -24,19 +24,19 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-userlibs</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,25 +23,25 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-userlibs</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -21,8 +21,9 @@ import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.api.activityapi.core.Startable;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.logging.log4j.Logger;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.atomic.AtomicLong;
@ -76,11 +77,12 @@ import java.util.concurrent.atomic.AtomicLong;
* overall workload is not saturating resources.
* </p>
*/
@Service(value = RateLimiter.class, selector = "hybrid")
public class HybridRateLimiter implements Startable, RateLimiter {
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
private volatile TokenFiller filler;
//private volatile TokenFiller filler;
private volatile long starttime;
// rate controls
@ -150,8 +152,9 @@ public class HybridRateLimiter implements Startable, RateLimiter {
}
this.rateSpec = updatingRateSpec;
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
this.tokens = this.filler.getTokenPool();
this.tokens = (this.tokens == null) ? new ThreadDrivenTokenPool(rateSpec, activityDef) : this.tokens.apply(rateSpec);
// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
// this.tokens = this.filler.getTokenPool();
if (this.state == State.Idle && updatingRateSpec.isAutoStart()) {
this.start();
@ -177,7 +180,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
case Idle:
long nanos = getNanoClockTime();
this.starttime = nanos;
this.filler.start();
this.tokens.start();
state = State.Started;
break;
}
@ -191,7 +194,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
case Started:
long accumulatedWaitSinceLastStart = cumulativeWaitTimeNanos.get();
cumulativeWaitTimeNanos.set(0L);
return this.filler.restart() + accumulatedWaitSinceLastStart;
return this.tokens.restart() + accumulatedWaitSinceLastStart;
default:
return 0L;
}
@ -215,14 +218,14 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName());
if (this.getRateSpec()!=null) {
if (this.getRateSpec() != null) {
sb.append(" spec=").append(this.getRateSpec().toString());
}
if (this.state!=null) {
if (this.state != null) {
sb.append(" state=").append(this.state);
}
if (this.filler !=null) {
sb.append(" filler=").append(this.filler.toString());
if (this.tokens != null) {
sb.append(" tokens=").append(this.tokens.toString());
}
return sb.toString();
}
@ -245,7 +248,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override
public Long getValue() {
TokenPool pool = rl.filler.getTokenPool();
TokenPool pool = rl.tokens;
if (pool==null) {
return 0L;
}

View File

@ -0,0 +1,269 @@
/*
*
* Copyright 2016 jshook
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import static io.nosqlbench.engine.api.util.Colors.*;
/**
* <h2>Synopsis</h2>
*
* This TokenPool represents a finite quantity which can be
* replenished with regular refills. Extra tokens that do not fit
* within the active token pool are saved in a waiting token pool and
* used to backfill when allowed according to the backfill rate.
*
* A detailed explanation for how this works will be included
* at @link "http://docs.nosqlbench.io/" under dev notes.
*
* <p>This is the basis for the token-based rate limiters in
* NB. This mechanism is easily adaptable to bursting
* capability as well as a degree of stricter timing at speed.
* Various methods for doing this in a lock free way were
* investigated, but the intrinsic locks provided by synchronized
* method won out for now. This may be revisited when EB is
* retrofitted for J11.
* </p>
*/
@Service(value = TokenPool.class, selector = "threaded")
public class ThreadDrivenTokenPool implements TokenPool {
private final static Logger logger = LogManager.getLogger(ThreadDrivenTokenPool.class);
public static final double MIN_CONCURRENT_OPS = 2;
private long maxActivePool;
private long burstPoolSize;
private long maxOverActivePool;
private double burstRatio;
// TODO Consider removing volatile after investigating
private volatile long activePool;
private volatile long waitingPool;
private RateSpec rateSpec;
private long nanosPerOp;
// private long debugTrigger=0L;
// private long debugRate=1000000000;
private long blocks = 0L;
private TokenFiller filler;
private final ActivityDef activityDef;
/**
* This constructor tries to pick reasonable defaults for the token pool for
* a given rate spec. The active pool must be large enough to contain one
* op worth of time, and the burst ratio
*
* @param rateSpec a {@link RateSpec}
*/
public ThreadDrivenTokenPool(RateSpec rateSpec, ActivityDef activityDef) {
this.activityDef = activityDef;
apply(rateSpec);
logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
// filler.start();
}
/**
* Change the settings of this token pool, and wake any blocked callers
* just in case it allows them to proceed.
*
* @param rateSpec The rate specifier.
*/
@Override
public synchronized TokenPool apply(RateSpec rateSpec) {
this.rateSpec = rateSpec;
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
this.burstRatio = rateSpec.getBurstRatio();
this.burstPoolSize = maxOverActivePool - maxActivePool;
this.nanosPerOp = rateSpec.getNanosPerOp();
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, activityDef) : filler.apply(rateSpec);
notifyAll();
return this;
}
@Override
public double getBurstRatio() {
return burstRatio;
}
/**
* Take tokens up to amt tokens form the pool and report
* the amount of token removed.
*
* @param amt tokens requested
* @return actual number of tokens removed, greater to or equal to zero
*/
@Override
public synchronized long takeUpTo(long amt) {
long take = Math.min(amt, activePool);
activePool -= take;
return take;
}
/**
* wait for the given number of tokens to be available, and then remove
* them from the pool.
*
* @return the total number of tokens untaken, including wait tokens
*/
@Override
public synchronized long blockAndTake() {
while (activePool < nanosPerOp) {
blocks++;
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= nanosPerOp;
return waitingPool + activePool;
}
@Override
public synchronized long blockAndTake(long tokens) {
while (activePool < tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= tokens;
return waitingPool + activePool;
}
@Override
public long getWaitTime() {
return activePool + waitingPool;
}
@Override
public long getWaitPool() {
return waitingPool;
}
@Override
public long getActivePool() {
return activePool;
}
/**
* Add the given number of new tokens to the pool, forcing any amount
* that would spill over the current pool size into the wait token pool, but
* moving up to the configured burst tokens back from the wait token pool
* otherwise.
*
* The amount of backfilling that occurs is controlled by the backfill ratio,
* based on the number of tokens submitted. This causes normalizes the
* backfilling rate to the fill rate, so that it is not sensitive to refill
* scheduling.
*
* @param newTokens The number of new tokens to add to the token pools
* @return the total number of tokens in all pools
*/
public synchronized long refill(long newTokens) {
boolean debugthis = false;
// long debugAt = System.nanoTime();
// if (debugAt>debugTrigger+debugRate) {
// debugTrigger=debugAt;
// debugthis=true;
// }
long needed = Math.max(maxActivePool - activePool, 0L);
long allocatedToActivePool = Math.min(newTokens, needed);
activePool += allocatedToActivePool;
// overflow logic
long allocatedToOverflowPool = newTokens - allocatedToActivePool;
waitingPool += allocatedToOverflowPool;
// backfill logic
double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
long burstFillAllowed = (long) (refillFactor * burstPoolSize);
burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
long burstFill = Math.min(burstFillAllowed, waitingPool);
waitingPool -= burstFill;
activePool += burstFill;
if (debugthis) {
System.out.print(this);
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
if (allocatedToOverflowPool > 0) {
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
}
if (burstFill > 0) {
System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
}
System.out.println();
}
//System.out.println(this);
notifyAll();
return activePool + waitingPool;
}
@Override
public String toString() {
return "Tokens: active=" + activePool + "/" + maxActivePool
+ String.format(
" (%3.1f%%)A (%3.1f%%)B ",
(((double) activePool / (double) maxActivePool) * 100.0),
(((double) activePool / (double) maxOverActivePool) * 100.0)) + " waiting=" + waitingPool +
" blocks=" + blocks +
" rateSpec:" + ((rateSpec != null) ? rateSpec.toString() : "NULL");
}
@Override
public RateSpec getRateSpec() {
return rateSpec;
}
@Override
public synchronized long restart() {
long wait = activePool + waitingPool;
activePool = 0L;
waitingPool = 0L;
filler.restart();
return wait;
}
@Override
public void start() {
filler.start();
}
}

View File

@ -35,7 +35,7 @@ public class TokenFiller implements Runnable {
// (false);
private final long interval = (long) 1E6;
private final TokenPool tokenPool;
private final ThreadDrivenTokenPool tokenPool;
private volatile boolean running = true;
private RateSpec rateSpec;
private Thread thread;
@ -43,23 +43,21 @@ public class TokenFiller implements Runnable {
private final Timer timer;
/**
* A token filler adds tokens to a {@link TokenPool} at some rate.
* A token filler adds tokens to a {@link ThreadDrivenTokenPool} at some rate.
* By default, this rate is at least every millisecond +- scheduling jitter
* in the JVM.
*
* @param rateSpec A {@link RateSpec}
* @param def An {@link ActivityDef}
* @param def An {@link ActivityDef}
*/
public TokenFiller(RateSpec rateSpec, ActivityDef def) {
public TokenFiller(RateSpec rateSpec, ThreadDrivenTokenPool tokenPool, ActivityDef def) {
this.rateSpec = rateSpec;
this.tokenPool= new TokenPool(rateSpec);
this.tokenPool.refill(rateSpec.getNanosPerOp());
this.tokenPool = tokenPool;
this.timer = ActivityMetrics.timer(def, "tokenfiller");
}
public TokenFiller apply(RateSpec rateSpec) {
this.rateSpec = rateSpec;
this.tokenPool.apply(rateSpec);
return this;
}
@ -99,6 +97,8 @@ public class TokenFiller implements Runnable {
}
public TokenFiller start() {
this.tokenPool.refill(rateSpec.getNanosPerOp());
thread = new Thread(this);
thread.setName(this.toString());
thread.setPriority(Thread.MAX_PRIORITY);

View File

@ -1,252 +1,25 @@
/*
*
* Copyright 2016 jshook
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
public interface TokenPool {
TokenPool apply(RateSpec rateSpec);
import static io.nosqlbench.engine.api.util.Colors.*;
double getBurstRatio();
/**
* <h2>Synopsis</h2>
*
* This TokenPool represents a finite quantity which can be
* replenished with regular refills. Extra tokens that do not fit
* within the active token pool are saved in a waiting token pool and
* used to backfill when allowed according to the backfill rate.
*
* A detailed explanation for how this works will be included
* at @link "http://docs.nosqlbench.io/" under dev notes.
*
* <p>This is the basis for the token-based rate limiters in
* NB. This mechanism is easily adaptable to bursting
* capability as well as a degree of stricter timing at speed.
* Various methods for doing this in a lock free way were
* investigated, but the intrinsic locks provided by synchronized
* method won out for now. This may be revisited when EB is
* retrofitted for J11.
* </p>
*/
public class TokenPool {
long takeUpTo(long amt);
private final static Logger logger = LogManager.getLogger(TokenPool.class);
long blockAndTake();
public static final double MIN_CONCURRENT_OPS = 2;
long blockAndTake(long tokens);
private long maxActivePool;
private long burstPoolSize;
private long maxOverActivePool;
private double burstRatio;
// TODO Consider removing volatile after investigating
private volatile long activePool;
private volatile long waitingPool;
private RateSpec rateSpec;
private long nanosPerOp;
// private long debugTrigger=0L;
// private long debugRate=1000000000;
private long blocks = 0L;
long getWaitTime();
long getWaitPool();
/**
* This constructor tries to pick reasonable defaults for the token pool for
* a given rate spec. The active pool must be large enough to contain one
* op worth of time, and the burst ratio
*
* @param rateSpec a {@link RateSpec}
*/
public TokenPool(RateSpec rateSpec) {
apply(rateSpec);
logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
}
long getActivePool();
public TokenPool(long poolsize, double burstRatio) {
this.maxActivePool = poolsize;
this.burstRatio = burstRatio;
this.maxOverActivePool = (long) (maxActivePool * burstRatio);
this.burstPoolSize = maxOverActivePool - maxActivePool;
}
RateSpec getRateSpec();
/**
* Change the settings of this token pool, and wake any blocked callers
* just in case it allows them to proceed.
*
* @param rateSpec The rate specifier.
*/
public synchronized void apply(RateSpec rateSpec) {
this.rateSpec=rateSpec;
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
this.burstRatio = rateSpec.getBurstRatio();
long restart();
this.burstPoolSize = maxOverActivePool - maxActivePool;
this.nanosPerOp = rateSpec.getNanosPerOp();
notifyAll();
}
public double getBurstRatio() {
return burstRatio;
}
/**
* Take tokens up to amt tokens form the pool and report
* the amount of token removed.
*
* @param amt tokens requested
* @return actual number of tokens removed, greater to or equal to zero
*/
public synchronized long takeUpTo(long amt) {
long take = Math.min(amt, activePool);
activePool -= take;
return take;
}
/**
* wait for the given number of tokens to be available, and then remove
* them from the pool.
*
* @return the total number of tokens untaken, including wait tokens
*/
public synchronized long blockAndTake() {
while (activePool < nanosPerOp) {
blocks++;
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= nanosPerOp;
return waitingPool + activePool;
}
public synchronized long blockAndTake(long tokens) {
while (activePool < tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= tokens;
return waitingPool + activePool;
}
public long getWaitTime() {
return activePool + waitingPool;
}
public long getWaitPool() {
return waitingPool;
}
public long getActivePool() {
return activePool;
}
/**
* Add the given number of new tokens to the pool, forcing any amount
* that would spill over the current pool size into the wait token pool, but
* moving up to the configured burst tokens back from the wait token pool
* otherwise.
*
* The amount of backfilling that occurs is controlled by the backfill ratio,
* based on the number of tokens submitted. This causes normalizes the
* backfilling rate to the fill rate, so that it is not sensitive to refill
* scheduling.
*
* @param newTokens The number of new tokens to add to the token pools
* @return the total number of tokens in all pools
*/
public synchronized long refill(long newTokens) {
boolean debugthis=false;
// long debugAt = System.nanoTime();
// if (debugAt>debugTrigger+debugRate) {
// debugTrigger=debugAt;
// debugthis=true;
// }
long needed = Math.max(maxActivePool - activePool, 0L);
long allocatedToActivePool = Math.min(newTokens, needed);
activePool += allocatedToActivePool;
// overflow logic
long allocatedToOverflowPool = newTokens - allocatedToActivePool;
waitingPool += allocatedToOverflowPool;
// backfill logic
double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
long burstFillAllowed =(long) (refillFactor* burstPoolSize);
burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
long burstFill = Math.min(burstFillAllowed, waitingPool);
waitingPool -= burstFill;
activePool += burstFill;
if (debugthis) {
System.out.print(this);
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
if (allocatedToOverflowPool>0) {
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
}
if (burstFill>0) {
System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
}
System.out.println();
}
//System.out.println(this);
notifyAll();
return activePool+waitingPool;
}
@Override
public String toString() {
return "Tokens: active=" + activePool +"/" + maxActivePool
+ String.format(
" (%3.1f%%)A (%3.1f%%)B ",
(((double)activePool/(double)maxActivePool)*100.0),
(((double)activePool/(double)maxOverActivePool)*100.0)) + " waiting=" + waitingPool +
" blocks=" + blocks +
" rateSpec:"+ ((rateSpec!=null) ? rateSpec.toString() : "NULL");
}
public RateSpec getRateSpec() {
return rateSpec;
}
public synchronized long restart() {
long wait=activePool+waitingPool;
activePool=0L;
waitingPool=0L;
return wait;
}
void start();
}

View File

@ -27,6 +27,7 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
@ -52,9 +53,23 @@ public class SimpleActivity implements Activity, ProgressCapable {
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
private int nameEnumerator = 0;
public SimpleActivity(ActivityDef activityDef) {
this.activityDef = activityDef;
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
"yaml"
);
if (workloadOpt.isPresent()) {
activityDef.getParams().set("alias", workloadOpt.get());
} else {
activityDef.getParams().set("alias",
activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ String.valueOf(nameEnumerator++));
}
}
}
public SimpleActivity(String activityDefString) {

View File

@ -17,30 +17,33 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import org.junit.Ignore;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TokenPoolTest {
ActivityDef def = new ActivityDef(ParameterMap.parseOrException("alias=testing"));
@Test
public void testBackfillFullRate() {
TokenPool p = new TokenPool(100, 1.1);
assertThat(p.refill(100L)).isEqualTo(100L);
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(10000000, 1.1), def);
assertThat(p.refill(1000000L)).isEqualTo(1000000L);
assertThat(p.getWaitPool()).isEqualTo(0L);
assertThat(p.refill(100L)).isEqualTo(200);
assertThat(p.refill(100L)).isEqualTo(1000100);
assertThat(p.getWaitPool()).isEqualTo(90L);
assertThat(p.refill(10L)).isEqualTo(210L);
assertThat(p.getWaitPool()).isEqualTo(100L);
assertThat(p.refill(10L)).isEqualTo(1000110L);
assertThat(p.getWaitPool()).isEqualTo(99L);
assertThat(p.refill(10)).isEqualTo(220L);
assertThat(p.refill(10)).isEqualTo(1000120L);
assertThat(p.takeUpTo(100)).isEqualTo(100L);
}
@Test
public void testTakeRanges() {
TokenPool p = new TokenPool(100, 10);
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(100, 10), def);
p.refill(100);
assertThat(p.takeUpTo(99)).isEqualTo(99L);
assertThat(p.takeUpTo(10)).isEqualTo(1L);
@ -51,7 +54,7 @@ public class TokenPoolTest {
public void testChangedParameters() {
RateSpec s1 = new RateSpec(1000L, 1.10D);
TokenPool p = new TokenPool(s1);
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(s1, def);
long r = p.refill(10000000);
assertThat(r).isEqualTo(10000000L);
assertThat(p.getWaitTime()).isEqualTo(10000000L);

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-core</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-docker</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -21,7 +21,7 @@ import io.nosqlbench.engine.core.script.ScenariosExecutor;
import io.nosqlbench.engine.core.script.ScriptParams;
import io.nosqlbench.engine.docker.DockerMetricsManager;
import io.nosqlbench.nb.api.annotations.Annotation;
import io.nosqlbench.nb.api.Layer;
import io.nosqlbench.nb.api.annotations.Layer;
import io.nosqlbench.nb.api.content.Content;
import io.nosqlbench.nb.api.content.NBIO;
import io.nosqlbench.nb.api.errors.BasicError;
@ -89,8 +89,10 @@ public class NBCLI {
loggerConfig.setConsoleLevel(NBLogLevel.ERROR);
NBCLIOptions globalOptions = new NBCLIOptions(args, NBCLIOptions.Mode.ParseGlobalsOnly);
String sessionName = new SessionNamer().format(globalOptions.getSessionName());
loggerConfig
.setSessionName(sessionName)
.setConsoleLevel(globalOptions.getConsoleLogLevel())
.setConsolePattern(globalOptions.getConsoleLoggingPattern())
.setLogfileLevel(globalOptions.getScenarioLogLevel())
@ -103,6 +105,7 @@ public class NBCLI {
logger = LogManager.getLogger("NBCLI");
loggerConfig.purgeOldFiles(LogManager.getLogger("SCENARIO"));
logger.info("Configured scenario log at " + loggerConfig.getLogfileLocation());
logger.debug("Scenario log started");
// Global only processing
if (args.length == 0) {
@ -131,7 +134,8 @@ public class NBCLI {
logger.info("Docker metrics is enabled. Docker must be installed for this to work");
DockerMetricsManager dmh = new DockerMetricsManager();
Map<String, String> dashboardOptions = Map.of(
DockerMetricsManager.GRAFANA_TAG, globalOptions.getDockerGrafanaTag()
DockerMetricsManager.GRAFANA_TAG, globalOptions.getDockerGrafanaTag(),
DockerMetricsManager.PROM_TAG, globalOptions.getDockerPromTag()
);
dmh.startMetrics(dashboardOptions);
String warn = "Docker Containers are started, for grafana and prometheus, hit" +
@ -144,8 +148,11 @@ public class NBCLI {
if (metricsAddr != null) {
reportGraphiteTo = metricsAddr + ":9109";
annotatorsConfig = "[{type:'log'},{type:'grafana',baseurl:'http://" + metricsAddr + ":3000/'," +
annotatorsConfig = "[{type:'log',level:'info'},{type:'grafana',baseurl:'http://" + metricsAddr + ":3000" +
"/'," +
"tags:'appname:nosqlbench',timeoutms:5000,onerror:'warn'}]";
} else {
annotatorsConfig = "[{type:'log',level:'info'}]";
}
if (args.length > 0 && args[0].toLowerCase().equals("virtdata")) {
@ -162,7 +169,6 @@ public class NBCLI {
}
NBCLIOptions options = new NBCLIOptions(args);
String sessionName = new SessionNamer().format(options.getSessionName());
logger = LogManager.getLogger("NBCLI");
NBIO.addGlobalIncludes(options.wantsIncludes());
@ -271,6 +277,7 @@ public class NBCLI {
System.exit(0);
}
logger.debug("initializing annotators with config:'" + annotatorsConfig + "'");
Annotators.init(annotatorsConfig);
Annotators.recordAnnotation(
Annotation.newBuilder()
@ -330,7 +337,8 @@ public class NBCLI {
options.getProgressSpec(),
options.wantsGraaljsCompatMode(),
options.wantsStackTraces(),
options.wantsCompileScript()
options.wantsCompileScript(),
String.join("\n", args)
);
ScriptBuffer buffer = new BasicScriptBuffer()
.add(options.getCommands().toArray(new Cmd[0]));
@ -378,7 +386,7 @@ public class NBCLI {
scenariosResults.reportToLog();
ShutdownManager.shutdown();
logger.info(scenariosResults.getExecutionSummary());
// logger.info(scenariosResults.getExecutionSummary());
if (scenariosResults.hasError()) {
Exception exception = scenariosResults.getOne().getException().get();

View File

@ -4,10 +4,8 @@ import io.nosqlbench.engine.api.metrics.IndicatorMode;
import io.nosqlbench.engine.api.util.Unit;
import io.nosqlbench.engine.core.script.Scenario;
import io.nosqlbench.nb.api.Environment;
import io.nosqlbench.nb.api.logging.NBLogLevel;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import io.nosqlbench.nb.api.logging.NBLogLevel;
import java.io.File;
import java.io.IOException;
@ -37,7 +35,6 @@ public class NBCLIOptions {
private static final String METRICS_PREFIX = "--metrics-prefix";
// private static final String ANNOTATE_TO_GRAFANA = "--grafana-baseurl";
private static final String ANNOTATE_EVENTS = "--annotate";
private static final String ANNOTATORS_CONFIG = "--annotators";
private static final String DEFAULT_ANNOTATORS = "all";
@ -88,11 +85,12 @@ public class NBCLIOptions {
private final static String ENABLE_CHART = "--enable-chart";
private final static String DOCKER_METRICS = "--docker-metrics";
private final static String DOCKER_METRICS_AT = "--docker-metrics-at";
private static final String DOCKER_GRAFANA_TAG = "--docker-grafana-tag";
private static final String DOCKER_PROM_TAG = "--docker-prom-tag";
private static final String GRAALJS_ENGINE = "--graaljs";
private static final String NASHORN_ENGINE = "--nashorn";
private static final String GRAALJS_COMPAT = "--graaljs-compat";
private static final String DOCKER_GRAFANA_TAG = "--docker-grafana-tag";
private static final String DEFAULT_CONSOLE_LOGGING_PATTERN = "%7r %-5level [%t] %-12logger{0} %msg%n%throwable";
@ -136,8 +134,9 @@ public class NBCLIOptions {
private final List<String> wantsToIncludePaths = new ArrayList<>();
private Scenario.Engine engine = Scenario.Engine.Graalvm;
private boolean graaljs_compat = false;
private int hdr_digits = 4;
private String docker_grafana_tag = "7.2.2";
private int hdr_digits = 3;
private String docker_grafana_tag = "7.3.4";
private String docker_prom_tag = "latest";
private boolean showStackTraces = false;
private boolean compileScript = false;
private String scriptFile = null;
@ -146,8 +145,8 @@ public class NBCLIOptions {
private String annotatorsConfig = "";
private String statedirs = NB_STATEDIR_PATHS;
private Path statepath;
private List<String> statePathAccesses = new ArrayList<>();
private String hdrForChartFileName = DEFAULT_CHART_HDR_LOG_NAME;
private final List<String> statePathAccesses = new ArrayList<>();
private final String hdrForChartFileName = DEFAULT_CHART_HDR_LOG_NAME;
public String getAnnotatorsConfig() {
return annotatorsConfig;
@ -285,6 +284,10 @@ public class NBCLIOptions {
arglist.removeFirst();
workspacesDirectory = readWordOrThrow(arglist, "a workspaces directory");
break;
case DOCKER_PROM_TAG:
arglist.removeFirst();
docker_prom_tag = readWordOrThrow(arglist, "prometheus docker tag");
break;
case DOCKER_GRAFANA_TAG:
arglist.removeFirst();
docker_grafana_tag = readWordOrThrow(arglist, "grafana docker tag");
@ -766,6 +769,10 @@ public class NBCLIOptions {
return docker_grafana_tag;
}
public String getDockerPromTag() {
return docker_prom_tag;
}
public static class LoggerConfigData {
public String file;
public String pattern = ".*";

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -21,7 +21,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -1,6 +1,7 @@
package io.nosqlbench.engine.clients.grafana;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class By {
@ -57,8 +58,11 @@ public class By {
return new By("type", "alert");
}
public static By tags(String tag) {
return new By("tags", tag);
/**
* Add one tag at a time, in either "tag" or "tag:value" form.
*/
public static By tag(String tag) {
return new By("tag", tag);
}
public static By id(int id) {
@ -69,11 +73,12 @@ public class By {
List<String> tags = new ArrayList<>();
StringBuilder sb = new StringBuilder();
for (By by : bys) {
if (by.key.equals("tags")) {
tags.add(by.value.toString());
if (by.key.equals("tag")) {
tags.addAll(Arrays.asList(by.value.toString().split(",")));
} else {
sb.append(by.key).append("=").append(by.value);
sb.append("&");
}
sb.append(by.key).append("=").append(by.value);
sb.append("&");
}
for (String tag : tags) {
sb.append("tags=").append(tag).append("&");

View File

@ -2,13 +2,19 @@ package io.nosqlbench.engine.clients.grafana;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.engine.clients.grafana.transfer.GrafanaAnnotation;
import io.nosqlbench.engine.clients.grafana.transfer.Annotations;
import io.nosqlbench.engine.clients.grafana.transfer.ApiTokenRequest;
import io.nosqlbench.engine.clients.grafana.transfer.DashboardResponse;
import io.nosqlbench.engine.clients.grafana.transfer.GrafanaAnnotation;
import java.io.File;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.function.Supplier;
/**
* @see <a href="https://grafana.com/docs/grafana/latest/http_api/annotations/">Grafana Annotations API Docs</a>
@ -179,6 +185,27 @@ public class GrafanaClient {
return savedGrafanaAnnotation;
}
public DashboardResponse getDashboardByUid(String uid) {
HttpClient client = config.newClient();
HttpRequest.Builder rqb = config.newRequest("api/dashboards/uid/" + uid);
rqb = rqb.GET();
HttpResponse<String> response = null;
try {
response = client.send(rqb.build(), HttpResponse.BodyHandlers.ofString());
} catch (Exception e) {
throw new RuntimeException(e);
}
if (response.statusCode() < 200 || response.statusCode() >= 300) {
throw new RuntimeException("Getting dashboard by uid (" + uid + ") failed with status code " + response.statusCode() +
" at baseuri " + config.getBaseUri() + ": " + response.body());
}
String body = response.body();
DashboardResponse dashboardResponse = gson.fromJson(body, DashboardResponse.class);
return dashboardResponse;
}
/**
* <pre>{@code
* POST /api/annotations/graphite
@ -302,13 +329,52 @@ public class GrafanaClient {
throw new RuntimeException("unimplemented");
}
/**
* This can be called to create an api token and store it for later use as long as you
* have the admin credentials for basic auth. This is preferred to continuing to
* passing basic auth for admin privileges. The permissions can now be narrowed or managed
* in a modular way.
*
* @param namer the principal name for the privelege
* @param role the Grafana role
* @param ttl Length of validity for the granted api token
* @param keyfilePath The path of the token. If it is present it will simply be used.
* @param un The basic auth username for the Admin role
* @param pw The basic auth password for the Admin role
*/
public void cacheApiToken(Supplier<String> namer, String role, long ttl, Path keyfilePath, String un, String pw) {
if (!Files.exists(keyfilePath)) {
GrafanaClientConfig basicClientConfig = config.copy();
basicClientConfig = basicClientConfig.basicAuth(un, pw);
GrafanaClient apiClient = new GrafanaClient(basicClientConfig);
String keyName = namer.get();
ApiToken apiToken = apiClient.createApiToken(keyName, role, ttl);
try {
if (keyfilePath.toString().contains(File.separator)) {
Files.createDirectories(keyfilePath.getParent(),
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwx---")));
}
Files.writeString(keyfilePath, apiToken.getKey());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
GrafanaMetricsAnnotator.AuthWrapper authHeaderSupplier = new GrafanaMetricsAnnotator.AuthWrapper(
"Authorization",
new GrafanaKeyFileReader(keyfilePath),
s -> "Bearer " + s
);
config.addHeaderSource(authHeaderSupplier);
}
public ApiToken createApiToken(String name, String role, long ttl) {
ApiTokenRequest r = new ApiTokenRequest(name, role, ttl);
ApiToken token = postToGrafana(r, ApiToken.class, "gen api token");
ApiToken token = postApiTokenRequest(r, ApiToken.class, "gen api token");
return token;
}
private <T> T postToGrafana(Object request, Class<? extends T> clazz, String desc) {
private <T> T postApiTokenRequest(Object request, Class<? extends T> clazz, String desc) {
HttpRequest rq = config.newJsonPOST("api/auth/keys", request);
HttpClient client = config.newClient();
@ -331,4 +397,5 @@ public class GrafanaClient {
T result = gson.fromJson(body, clazz);
return result;
}
}

View File

@ -11,7 +11,6 @@ import io.nosqlbench.nb.api.config.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.Map;
@ -19,11 +18,11 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
@Service(value = Annotator.class, selector = "grafana" )
@Service(value = Annotator.class, selector = "grafana")
public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
private final static Logger logger = LogManager.getLogger("ANNOTATORS" );
private final static Logger annotationsLog = LogManager.getLogger("ANNOTATIONS" );
private final static Logger logger = LogManager.getLogger("ANNOTATORS");
//private final static Logger annotationsLog = LogManager.getLogger("ANNOTATIONS" );
private OnError onError = OnError.Warn;
private GrafanaClient client;
@ -45,9 +44,15 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
});
ga.getTags().add("layer:" + annotation.getLayer().toString());
if (annotation.getStart() == annotation.getEnd()) {
ga.getTags().add("span:instant");
} else {
ga.getTags().add("span:interval");
}
Map<String, String> labels = annotation.getLabels();
Optional.ofNullable(labels.get("alertId" ))
Optional.ofNullable(labels.get("alertId"))
.map(Integer::parseInt).ifPresent(ga::setAlertId);
ga.setText(annotation.toString());
@ -56,33 +61,32 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
// Target
Optional.ofNullable(labels.get("type" ))
Optional.ofNullable(labels.get("type"))
.ifPresent(ga::setType);
Optional.ofNullable(labels.get("id" )).map(Integer::valueOf)
Optional.ofNullable(labels.get("id")).map(Integer::valueOf)
.ifPresent(ga::setId);
Optional.ofNullable(labels.get("alertId" )).map(Integer::valueOf)
Optional.ofNullable(labels.get("alertId")).map(Integer::valueOf)
.ifPresent(ga::setAlertId);
Optional.ofNullable(labels.get("dashboardId" )).map(Integer::valueOf)
Optional.ofNullable(labels.get("dashboardId")).map(Integer::valueOf)
.ifPresent(ga::setDashboardId);
Optional.ofNullable(labels.get("panelId" )).map(Integer::valueOf)
Optional.ofNullable(labels.get("panelId")).map(Integer::valueOf)
.ifPresent(ga::setPanelId);
Optional.ofNullable(labels.get("userId" )).map(Integer::valueOf)
Optional.ofNullable(labels.get("userId")).map(Integer::valueOf)
.ifPresent(ga::setUserId);
Optional.ofNullable(labels.get("userName" ))
Optional.ofNullable(labels.get("userName"))
.ifPresent(ga::setUserName);
Optional.ofNullable(labels.get("metric" ))
Optional.ofNullable(labels.get("metric"))
.ifPresent(ga::setMetric);
// Details
annotationsLog.info("ANNOTATION:" + ga.toString());
GrafanaAnnotation created = this.client.createAnnotation(ga);
} catch (Exception e) {
@ -110,58 +114,65 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
GrafanaClientConfig gc = new GrafanaClientConfig();
gc.setBaseUri(cfg.param("baseurl", String.class));
if (cfg.containsKey("tags" )) {
if (cfg.containsKey("tags")) {
this.tags = ParamsParser.parse(cfg.param("tags", String.class), false);
}
if (cfg.containsKey("username" )) {
if (cfg.containsKey("password" )) {
if (cfg.containsKey("username")) {
if (cfg.containsKey("password")) {
gc.basicAuth(
cfg.param("username", String.class),
cfg.param("password", String.class)
);
} else {
gc.basicAuth(cfg.param("username", String.class), "" );
gc.basicAuth(cfg.param("username", String.class), "");
}
}
Path keyfilePath = null;
if (cfg.containsKey("apikeyfile" )) {
if (cfg.containsKey("apikeyfile")) {
String apikeyfile = cfg.paramEnv("apikeyfile", String.class);
keyfilePath = Path.of(apikeyfile);
} else if (cfg.containsKey("apikey" )) {
} else if (cfg.containsKey("apikey")) {
gc.addHeaderSource(() -> Map.of("Authorization", "Bearer " + cfg.param("apikey", String.class)));
} else {
Optional<String> apikeyLocation = Environment.INSTANCE.interpolate("$NBSTATEDIR/grafana_apikey" );
Optional<String> apikeyLocation = Environment.INSTANCE
.interpolate(cfg.paramEnv("apikeyfile", String.class));
keyfilePath = apikeyLocation.map(Path::of).orElseThrow();
}
if (!Files.exists(keyfilePath)) {
logger.info("Auto-configuring grafana apikey." );
GrafanaClientConfig apiClientConf = gc.copy().basicAuth("admin", "admin" );
GrafanaClient apiClient = new GrafanaClient(apiClientConf);
try {
String nodeId = SystemId.getNodeId();
String keyName = "nosqlbench-" + nodeId + "-" + System.currentTimeMillis();
ApiToken apiToken = apiClient.createApiToken(keyName, "Admin", Long.MAX_VALUE);
Files.writeString(keyfilePath, apiToken.getKey());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// if (!Files.exists(keyfilePath)) {
// logger.info("Auto-configuring grafana apikey.");
// GrafanaClientConfig apiClientConf = gc.copy().basicAuth("admin", "admin");
// GrafanaClient apiClient = new GrafanaClient(apiClientConf);
// try {
// String nodeId = SystemId.getNodeId();
//
// String keyName = "nosqlbench-" + nodeId + "-" + System.currentTimeMillis();
// ApiToken apiToken = apiClient.createApiToken(keyName, "Admin", Long.MAX_VALUE);
// Files.createDirectories(keyfilePath.getParent(),
// PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwx---")));
// Files.writeString(keyfilePath, apiToken.getKey());
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
//
// AuthWrapper authHeaderSupplier = new AuthWrapper(
// "Authorization",
// new GrafanaKeyFileReader(keyfilePath),
// s -> "Bearer " + s
// );
// gc.addHeaderSource(authHeaderSupplier);
AuthWrapper authHeaderSupplier = new AuthWrapper(
"Authorization",
new GrafanaKeyFileReader(keyfilePath),
s -> "Bearer " + s
);
gc.addHeaderSource(authHeaderSupplier);
this.onError = OnError.valueOfName(cfg.get("onerror" ).toString());
this.onError = OnError.valueOfName(cfg.get("onerror").toString());
this.client = new GrafanaClient(gc);
String keyName = "nosqlbench-" + SystemId.getNodeId() + "-" + System.currentTimeMillis();
Supplier<String> namer = () -> "nosqlbench-" + SystemId.getNodeId() + "-" + System.currentTimeMillis();
this.client.cacheApiToken(namer, "Admin", Long.MAX_VALUE, keyfilePath, "admin", "admin");
}
@ -169,22 +180,22 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
public ConfigModel getConfigModel() {
return new MutableConfigModel(this)
.required("baseurl", String.class,
"The base url of the grafana node, like http://localhost:3000/" )
.defaultto("apikeyfile", "$NBSTATEDIR/grafana_apikey",
"The file that contains the api key, supersedes apikey" )
"The base url of the grafana node, like http://localhost:3000/")
.defaultto("apikeyfile", "$NBSTATEDIR/grafana/grafana_apikey",
"The file that contains the api key, supersedes apikey")
.optional("apikey", String.class,
"The api key to use, supersedes basic username and password" )
"The api key to use, supersedes basic username and password")
.optional("username", String.class,
"The username to use for basic auth" )
"The username to use for basic auth")
.optional("password", String.class,
"The password to use for basic auth" )
"The password to use for basic auth")
.defaultto("tags", "source:nosqlbench",
"The tags that identify the annotations, in k:v,... form" )
"The tags that identify the annotations, in k:v,... form")
// .defaultto("onerror", OnError.Warn)
.defaultto("onerror", "warn",
"What to do when an error occurs while posting an annotation" )
"What to do when an error occurs while posting an annotation")
.defaultto("timeoutms", 5000,
"connect and transport timeout for the HTTP client" )
"connect and transport timeout for the HTTP client")
.asReadOnly();
}

View File

@ -0,0 +1,42 @@
package io.nosqlbench.engine.clients.grafana.analyzer;
import io.nosqlbench.engine.clients.grafana.By;
import io.nosqlbench.engine.clients.grafana.GrafanaClient;
import io.nosqlbench.engine.clients.grafana.GrafanaClientConfig;
import io.nosqlbench.engine.clients.grafana.transfer.Annotations;
import io.nosqlbench.engine.clients.grafana.transfer.DashboardResponse;
import io.nosqlbench.engine.clients.grafana.transfer.GrafanaAnnotation;
import io.nosqlbench.nb.api.SystemId;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class GrafanaRegionAnalyzer {
public static void main(String[] args) {
GrafanaClientConfig ccfg = new GrafanaClientConfig();
ccfg.setBaseUri("http://18.191.247.162:3000/");
GrafanaClient gclient = new GrafanaClient(ccfg);
Supplier<String> namer = () -> "nosqlbench-" + SystemId.getNodeId() + "-" + System.currentTimeMillis();
gclient.cacheApiToken(namer, "Admin", Long.MAX_VALUE, Path.of("grafana_apikey"), "admin", "admin");
DashboardResponse dashboardResponse = gclient.getDashboardByUid("aIIX1f6Wz");
Annotations annotations = gclient.findAnnotations(By.tag("appname:nosqlbench,layer:Activity"));
List<GrafanaAnnotation> mainActivities = annotations.stream()
.filter(s -> s.getTagMap().getOrDefault("alias", "").contains("main"))
.sorted(Comparator.comparing(t -> t.getTime()))
.collect(Collectors.toList());
System.out.println("end");
GrafanaAnnotation last = mainActivities.get(mainActivities.size() - 1);
long start = last.getTime();
long end = last.getTimeEnd();
}
}

View File

@ -0,0 +1,28 @@
package io.nosqlbench.engine.clients.grafana.transfer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Dashboard {
Map<String, Object> annotations = new HashMap<>();
String description;
boolean editable;
long graphToolTip;
long id;
long iteration;
List<Object> links;
List<PanelDef> panels;
String refresh;
long schemaVersion;
String style;
List<Object> tags;
Map<String, Object> templating;
Map<String, Object> time;
Map<String, List<String>> timepicker;
String timezone;
String title;
String uid;
long version;
}

View File

@ -0,0 +1,8 @@
package io.nosqlbench.engine.clients.grafana.transfer;
import java.util.List;
import java.util.Map;
public class DashboardAnnotations {
List<Map<String, Object>> list;
}

View File

@ -0,0 +1,9 @@
package io.nosqlbench.engine.clients.grafana.transfer;
import java.util.HashMap;
import java.util.Map;
public class DashboardResponse {
Map<String, String> meta = new HashMap<>();
Dashboard dashboard;
}

View File

@ -171,4 +171,13 @@ public class GrafanaAnnotation {
", data=" + data +
'}';
}
public LinkedHashMap<String, String> getTagMap() {
LinkedHashMap<String, String> map = new LinkedHashMap<>();
for (String tag : this.getTags()) {
String[] split = tag.split(":", 2);
map.put(split[0], (split.length == 2 ? split[1] : null));
}
return map;
}
}

View File

@ -0,0 +1,24 @@
package io.nosqlbench.engine.clients.grafana.transfer;
import java.util.List;
import java.util.Map;
public class PanelDef {
boolean collapsed;
Map<String, String> gridPos;
long id;
List<PanelDef> panels;
String description;
Map<String, Object> fieldConfig;
Map<String, String> options;
String pluginVersion;
List<Map<String, String>> targets;
String title;
String type;
String datasource;
public String toString() {
return id + ":'" + title + "'";
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -28,13 +28,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
@ -85,7 +85,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-clients</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

View File

@ -96,7 +96,14 @@ public class Annotators {
}
public static synchronized void recordAnnotation(Annotation annotation) {
getAnnotators().forEach(a -> a.recordAnnotation(annotation));
for (Annotator annotator : getAnnotators()) {
try {
logger.trace("calling annotator " + annotator.getName());
annotator.recordAnnotation(annotation);
} catch (Exception e) {
logger.error(e);
}
}
}
// public static synchronized void recordAnnotation(

View File

@ -18,6 +18,9 @@ import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.nb.api.annotations.Layer;
import io.nosqlbench.nb.api.annotations.Annotation;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -52,24 +55,33 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private final List<Motor<?>> motors = new ArrayList<>();
private final Activity activity;
private final ActivityDef activityDef;
private ExecutorService executorService;
private final ExecutorService executorService;
private RuntimeException stoppingException;
private final static int waitTime = 10000;
private String sessionId = "";
private long startedAt = 0L;
private long stoppedAt = 0L;
private String[] annotatedCommand;
// private RunState intendedState = RunState.Uninitialized;
public ActivityExecutor(Activity activity) {
public ActivityExecutor(Activity activity, String sessionId) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
);
activity.getActivityDef().getParams().addListener(this);
activity.setActivityController(this);
this.sessionId = sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
@ -86,9 +98,22 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
*/
public synchronized void startActivity() {
logger.info("starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
this.annotatedCommand = annotatedCommand;
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
try {
activity.setRunState(RunState.Starting);
this.startedAt = System.currentTimeMillis();
activity.initActivity();
//activity.onActivityDefUpdate(activityDef);
} catch (Exception e) {
@ -117,6 +142,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.setRunState(RunState.Stopped);
logger.info("stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
activitylogger.debug("STOP/after alias=(" + activity.getAlias() + ")");
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
}
@ -179,11 +215,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
}
}
public boolean requestStopExecutor(int secondsToWait) {
public boolean finishAndShutdownExecutor(int secondsToWait) {
activitylogger.debug("REQUEST STOP/before alias=(" + activity.getAlias() + ")");
logger.info("Stopping executor for " + activity.getAlias() + " when work completes.");
logger.debug("Stopping executor for " + activity.getAlias() + " when work completes.");
executorService.shutdown();
boolean wasStopped = false;
@ -201,7 +236,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.trace("closing auto-closeables");
activity.closeAutoCloseables();
activity.setRunState(RunState.Stopped);
this.stoppedAt = System.currentTimeMillis();
}
if (stoppingException != null) {
logger.trace("an exception caused the activity to stop:" + stoppingException.getMessage());
throw stoppingException;
@ -220,7 +257,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
public synchronized void handleParameterMapUpdate(ParameterMap parameterMap) {
if (activity instanceof ActivityDefObserver) {
((ActivityDefObserver) activity).onActivityDefUpdate(activityDef);
activity.onActivityDefUpdate(activityDef);
}
// An activity must be initialized before the motors and other components are
@ -231,10 +268,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustToActivityDef(activity.getActivityDef());
}
motors.stream()
.filter(m -> (m instanceof ActivityDefObserver))
.filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
}
}
@ -242,8 +279,27 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
return activityDef;
}
/**
* This is the canonical way to wait for an activity to finish. It ties together
* any way that an activity can finish under one blocking call.
* This should be awaited asynchronously from the control layer in separate threads.
*
* TODO: move activity finisher threaad to this class and remove separate implementation
*/
public boolean awaitCompletion(int waitTime) {
return requestStopExecutor(waitTime);
boolean finished = finishAndShutdownExecutor(waitTime);
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
return finished;
}
public boolean awaitFinish(int timeout) {
@ -267,8 +323,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() {
return motors.stream()
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
}
/**
@ -311,18 +367,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running:
case Starting:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
break;
case Stopped:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
break;
case Finished:
case Stopping:
@ -403,7 +459,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState);
if (!awaited) {
logger.trace("failed awaiting motor " + motor.getSlotId() + " for state in " +
Arrays.asList(awaitingState));
Arrays.asList(awaitingState));
break;
}
}
@ -447,8 +503,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
boolean awaitedRequiredState = awaitMotorState(m, waitTime, pollTime, awaitingState);
if (!awaitedRequiredState) {
String error = "Unable to await " + activityDef.getAlias() +
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
RuntimeException e = new RuntimeException(error);
logger.error(error);
throw e;

View File

@ -0,0 +1,23 @@
package io.nosqlbench.engine.core.lifecycle;
public class ActivityFinisher extends Thread {
private final ActivityExecutor executor;
private final int timeout;
private boolean result;
public ActivityFinisher(ActivityExecutor executor, int timeout) {
super(executor.getActivityDef().getAlias() + "_finisher");
this.executor = executor;
this.timeout = timeout;
}
@Override
public void run() {
result = executor.awaitCompletion(timeout);
}
public boolean getResult() {
return result;
}
}

View File

@ -20,6 +20,9 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.ProgressAndStateMeter;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.nb.api.annotations.Layer;
import io.nosqlbench.nb.api.annotations.Annotation;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -38,6 +41,11 @@ public class ScenarioController {
private static final Logger scenariologger = LogManager.getLogger("SCENARIO");
private final Map<String, ActivityExecutor> activityExecutors = new ConcurrentHashMap<>();
private final String sessionId;
public ScenarioController(String sessionId) {
this.sessionId = sessionId;
}
/**
* Start an activity, given the activity definition for it. The activity will be known in the scenario
@ -46,9 +54,20 @@ public class ScenarioController {
* @param activityDef string in alias=value1;driver=value2;... format
*/
public synchronized void start(ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "start")
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
scenariologger.debug("START " + activityDef.getAlias());
activityExecutor.startActivity();
}
/**
@ -84,6 +103,15 @@ public class ScenarioController {
* @param activityDef A definition for an activity to run
*/
public synchronized void run(int timeout, ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "run")
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
scenariologger.debug("RUN alias=" + activityDef.getAlias());
scenariologger.debug(" (RUN/START) alias=" + activityDef.getAlias());
@ -135,6 +163,15 @@ public class ScenarioController {
* @param activityDef An activity def, including at least the alias parameter.
*/
public synchronized void stop(ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "stop")
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
if (activityExecutor == null) {
throw new RuntimeException("could not stop missing activity:" + activityDef);
@ -179,7 +216,7 @@ public class ScenarioController {
}
ActivityExecutor activityExecutor = getActivityExecutor(alias);
ParameterMap params = activityExecutor.getActivityDef().getParams();
scenariologger.debug("SET ("+alias+"/"+param + ")=(" + value + ")");
scenariologger.debug("SET (" + alias + "/" + param + ")=(" + value + ")");
params.set(param, value);
}
@ -242,30 +279,31 @@ public class ScenarioController {
if (executor == null && createIfMissing) {
String activityTypeName = activityDef.getParams().getOptionalString("driver","type").orElse(null);
String activityTypeName = activityDef.getParams().getOptionalString("driver", "type").orElse(null);
List<String> knownTypes = ActivityType.FINDER.getAll().stream().map(ActivityType::getName).collect(Collectors.toList());
// Infer the type from either alias or yaml if possible (exactly one matches)
if (activityTypeName==null) {
if (activityTypeName == null) {
List<String> matching = knownTypes.stream().filter(
n ->
activityDef.getParams().getOptionalString("alias").orElse("").contains(n)
|| activityDef.getParams().getOptionalString("yaml", "workload").orElse("").contains(n)
).collect(Collectors.toList());
if (matching.size()==1) {
activityTypeName=matching.get(0);
if (matching.size() == 1) {
activityTypeName = matching.get(0);
logger.info("param 'type' was inferred as '" + activityTypeName + "' since it was seen in yaml or alias parameter.");
}
}
if (activityTypeName==null) {
if (activityTypeName == null) {
String errmsg = "You must provide a driver=<driver> parameter. Valid examples are:\n" +
knownTypes.stream().map(t -> " driver="+t+"\n").collect(Collectors.joining());
knownTypes.stream().map(t -> " driver=" + t + "\n").collect(Collectors.joining());
throw new BasicError(errmsg);
}
ActivityType<?> activityType = ActivityType.FINDER.getOrThrow(activityTypeName);
executor = new ActivityExecutor(activityType.getAssembledActivity(activityDef, getActivityMap()));
executor = new ActivityExecutor(activityType.getAssembledActivity(activityDef, getActivityMap()),
this.sessionId);
activityExecutors.put(activityDef.getAlias(), executor);
}
return executor;
@ -345,15 +383,33 @@ public class ScenarioController {
* @return true, if all activities completed before the timer expired, false otherwise
*/
public boolean awaitCompletion(int waitTimeMillis) {
boolean completed = false;
for (ActivityExecutor executor : activityExecutors.values()) {
if (!executor.awaitCompletion(waitTimeMillis)) {
logger.debug("awaiting completion signaled FALSE");
return false;
boolean completed = true;
long waitstart = System.currentTimeMillis();
long remaining = waitTimeMillis;
List<ActivityFinisher> finishers = new ArrayList<>();
for (ActivityExecutor ae : activityExecutors.values()) {
ActivityFinisher finisher = new ActivityFinisher(ae, (int) remaining);
finishers.add(finisher);
finisher.start();
}
for (ActivityFinisher finisher : finishers) {
try {
finisher.join(waitTimeMillis);
} catch (InterruptedException ignored) {
}
}
logger.debug("All activities awaiting completion signaled TRUE");
return true;
for (ActivityFinisher finisher : finishers) {
if (!finisher.getResult()) {
logger.debug("finisher for " + finisher.getName() + " did not signal TRUE");
completed = false;
}
}
return completed;
}
private ActivityDef aliasToDef(String alias) {
@ -364,9 +420,10 @@ public class ScenarioController {
}
}
public boolean await(Map<String,String> activityDefMap) {
public boolean await(Map<String, String> activityDefMap) {
return this.awaitActivity(activityDefMap);
}
public boolean awaitActivity(Map<String, String> activityDefMap) {
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
return awaitActivity(ad);
@ -375,6 +432,7 @@ public class ScenarioController {
public boolean await(String alias) {
return this.awaitActivity(alias);
}
public boolean awaitActivity(String alias) {
ActivityDef toAwait = aliasToDef(alias);
return awaitActivity(toAwait);
@ -383,6 +441,7 @@ public class ScenarioController {
public boolean await(ActivityDef activityDef) {
return this.awaitActivity(activityDef);
}
public boolean awaitActivity(ActivityDef activityDef) {
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
if (activityExecutor == null) {

View File

@ -31,23 +31,33 @@ import java.util.concurrent.TimeUnit;
public class ScenarioResult {
private final static Logger logger = LogManager.getLogger(ScenarioResult.class);
private final long startedAt;
private final long endedAt;
private Exception exception;
private final String iolog;
public ScenarioResult(String iolog) {
public ScenarioResult(String iolog, long startedAt, long endedAt) {
this.iolog = iolog;
this.startedAt = startedAt;
this.endedAt = endedAt;
}
public ScenarioResult(Exception e) {
public ScenarioResult(Exception e, long startedAt, long endedAt) {
this.iolog = e.getMessage();
this.startedAt = startedAt;
this.endedAt = endedAt;
this.exception = e;
}
public void reportToLog() {
public void reportElapsedMillis() {
logger.info("-- SCENARIO TOOK " + getElapsedMillis() + "ms --");
}
logger.info("-- BEGIN METRICS DETAIL --");
public void reportToLog() {
logger.debug("-- BEGIN METRICS DETAIL --");
Log4JMetricsReporter reporter = Log4JMetricsReporter.forRegistry(ActivityMetrics.getMetricRegistry())
.withLoggingLevel(Log4JMetricsReporter.LoggingLevel.DEBUG)
.convertDurationsTo(TimeUnit.MICROSECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL)
@ -87,4 +97,7 @@ public class ScenarioResult {
return this.iolog;
}
public long getElapsedMillis() {
return endedAt - startedAt;
}
}

View File

@ -66,8 +66,15 @@ public class ScenariosResults {
logger.info("results for scenario: " + scenario);
if (oresult != null) {
oresult.reportToLog();
oresult.reportToConsole();
oresult.reportElapsedMillis();
if (oresult.getElapsedMillis() >= 60_000) {
oresult.reportToConsole();
oresult.reportToLog();
} else {
logger.info("Metrics suppressed because scenario was less than 1 minute long.");
logger.info("Metrics data is not reliable for short sampling periods.");
logger.info("To get metrics on console, run a longer scenario.");
}
} else {
logger.error(scenario.getScenarioName() + ": incomplete (missing result)");
}

View File

@ -13,12 +13,16 @@ import org.apache.logging.log4j.core.config.ConfigurationSource;
import org.apache.logging.log4j.core.config.builder.api.*;
import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
import java.nio.file.attribute.*;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.*;
import java.util.stream.Collectors;
@ -202,6 +206,16 @@ public class LoggerConfig extends ConfigurationFactory {
}
public void activate() {
if (!Files.exists(loggerDir)) {
try {
FileAttribute<Set<PosixFilePermission>> attrs = PosixFilePermissions.asFileAttribute(
PosixFilePermissions.fromString("rwxrwx---")
);
Path directory = Files.createDirectory(loggerDir, attrs);
} catch (Exception e) {
throw new RuntimeException("Error while creating directory " + loggerDir.toString() + ": " + e.getMessage(), e);
}
}
ConfigurationFactory.setConfigurationFactory(this);
}

View File

@ -28,7 +28,8 @@ public class LoggingAnnotator implements Annotator, ConfigAware {
@Override
public void recordAnnotation(Annotation annotation) {
annotationsLog.log(level, annotation.toString());
String inlineForm = annotation.asJson();
annotationsLog.log(level, inlineForm);
}
/**

View File

@ -25,7 +25,7 @@ import io.nosqlbench.engine.core.lifecycle.ScenarioController;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.metrics.PolyglotMetricRegistryBindings;
import io.nosqlbench.nb.api.Layer;
import io.nosqlbench.nb.api.annotations.Layer;
import io.nosqlbench.nb.api.annotations.Annotation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -56,14 +56,19 @@ import java.util.stream.Collectors;
public class Scenario implements Callable<ScenarioResult> {
private final String commandLine;
private Logger logger = LogManager.getLogger("SCENARIO");
private State state = State.Scheduled;
private volatile ScenarioShutdownHook scenarioShutdownHook;
private Exception error;
public enum State {
Scheduled,
Running,
Errored,
Interrupted,
Finished
}
@ -96,7 +101,8 @@ public class Scenario implements Callable<ScenarioResult> {
String progressInterval,
boolean wantsGraaljsCompatMode,
boolean wantsStackTraces,
boolean wantsCompiledScript) {
boolean wantsCompiledScript,
String commandLine) {
this.scenarioName = scenarioName;
this.scriptfile = scriptfile;
this.engine = engine;
@ -104,6 +110,7 @@ public class Scenario implements Callable<ScenarioResult> {
this.wantsGraaljsCompatMode = wantsGraaljsCompatMode;
this.wantsStackTraces = wantsStackTraces;
this.wantsCompiledScript = wantsCompiledScript;
this.commandLine = commandLine;
}
public Scenario setLogger(Logger logger) {
@ -118,6 +125,7 @@ public class Scenario implements Callable<ScenarioResult> {
public Scenario(String name, Engine engine) {
this.scenarioName = name;
this.engine = engine;
this.commandLine = "";
}
public Scenario addScriptText(String scriptText) {
@ -180,7 +188,7 @@ public class Scenario implements Callable<ScenarioResult> {
break;
}
scenarioController = new ScenarioController();
scenarioController = new ScenarioController(this.scenarioName);
if (!progressInterval.equals("disabled")) {
activityProgressIndicator = new ActivityProgressIndicator(scenarioController, progressInterval);
}
@ -229,6 +237,9 @@ public class Scenario implements Callable<ScenarioResult> {
}
public void run() {
scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(scenarioShutdownHook);
state = State.Running;
startedAtMillis = System.currentTimeMillis();
@ -237,7 +248,6 @@ public class Scenario implements Callable<ScenarioResult> {
.session(this.scenarioName)
.now()
.layer(Layer.Scenario)
.label("scenario", getScenarioName())
.detail("engine", this.engine.toString())
.build()
);
@ -285,14 +295,12 @@ public class Scenario implements Callable<ScenarioResult> {
this.state = State.Errored;
logger.warn("Error in scenario, shutting down.");
this.scenarioController.forceStopScenario(5000, false);
this.error = e;
throw new RuntimeException(e);
} finally {
if (this.state==State.Running) {
this.state = State.Finished;
}
System.out.flush();
System.err.flush();
endedAtMillis=System.currentTimeMillis();
endedAtMillis = System.currentTimeMillis();
}
}
int awaitCompletionTime = 86400 * 365 * 1000;
@ -300,8 +308,38 @@ public class Scenario implements Callable<ScenarioResult> {
scenarioController.awaitCompletion(awaitCompletionTime);
//TODO: Ensure control flow covers controller shutdown in event of internal error.
logger.debug("scenario completed without errors");
endedAtMillis=System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook);
scenarioShutdownHook = null;
finish();
}
public void finish() {
logger.debug("finishing scenario");
endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (this.state == State.Running) {
this.state = State.Finished;
}
if (scenarioShutdownHook != null) {
// If this method was called while the shutdown hook is defined, then it means
// that the scenario was ended before the hook was uninstalled normally.
this.state = State.Interrupted;
logger.warn("Scenario was interrupted by process exit, shutting down");
}
logger.info("scenario state: " + this.state);
// We report the scenario state via annotation even for short runs
Annotation annotation = Annotation.newBuilder()
.session(this.scenarioName)
.interval(this.startedAtMillis, endedAtMillis)
.layer(Layer.Scenario)
.label("state", this.state.toString())
.detail("command_line", this.commandLine)
.build();
Annotators.recordAnnotation(annotation);
}
public long getStartedAtMillis() {
@ -315,7 +353,7 @@ public class Scenario implements Callable<ScenarioResult> {
public ScenarioResult call() {
run();
String iolog = scriptEnv.getTimedLog();
return new ScenarioResult(iolog);
return new ScenarioResult(iolog, this.startedAtMillis, this.endedAtMillis);
}
@Override

View File

@ -0,0 +1,20 @@
package io.nosqlbench.engine.core.script;
import org.apache.logging.log4j.Logger;
public class ScenarioShutdownHook extends Thread {
private final Scenario scenario;
private final Logger logger;
public ScenarioShutdownHook(Scenario scenario) {
this.scenario = scenario;
logger = scenario.getLogger();
}
@Override
public void run() {
scenario.finish();
}
}

View File

@ -149,7 +149,8 @@ public class ScenariosExecutor {
try {
oResult = Optional.of(resultFuture.get());
} catch (Exception e) {
oResult = Optional.of(new ScenarioResult(e));
long now = System.currentTimeMillis();
oResult = Optional.of(new ScenarioResult(e, now, now));
}
}
@ -182,27 +183,28 @@ public class ScenariosExecutor {
if (resultFuture1 == null) {
throw new BasicError("Unknown scenario name:" + scenarioName);
}
long now = System.currentTimeMillis();
if (resultFuture1.isDone()) {
try {
return Optional.ofNullable(resultFuture1.get());
} catch (Exception e) {
return Optional.of(new ScenarioResult(e));
return Optional.of(new ScenarioResult(e, now, now));
}
} else if (resultFuture1.isCancelled()) {
return Optional.of(new ScenarioResult(new Exception("result was cancelled.")));
return Optional.of(new ScenarioResult(new Exception("result was cancelled."), now, now));
}
return Optional.empty();
}
public synchronized void stopScenario(String scenarioName) {
this.stopScenario(scenarioName,false);
this.stopScenario(scenarioName, false);
}
public synchronized void stopScenario(String scenarioName, boolean rethrow) {
Optional<Scenario> pendingScenario = getPendingScenario(scenarioName);
if (pendingScenario.isPresent()) {
ScenarioController controller = pendingScenario.get().getScenarioController();
if (controller!=null) {
if (controller != null) {
controller.forceStopScenario(0, rethrow);
}
} else {

View File

@ -52,7 +52,7 @@ public class ActivityExecutorTest {
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a);
ActivityExecutor ae = new ActivityExecutor(a, "test-restart");
ad.setThreads(1);
ae.startActivity();
ae.stopActivity();
@ -76,7 +76,7 @@ public class ActivityExecutorTest {
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a);
ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start");
ad.setThreads(1);
ae.startActivity();
ae.awaitCompletion(15000);
@ -101,7 +101,7 @@ public class ActivityExecutorTest {
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a);
ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor");
ad.setThreads(5);
ae.startActivity();

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -65,7 +65,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -45,7 +45,7 @@ public class DockerHelper {
}
public String startDocker(String IMG, String tag, String name, List<Integer> ports, List<String> volumeDescList, List<String> envList, List<String> cmdList, String reload, List<String> linkNames) {
logger.debug("Starting docker with img=" + IMG + ", tag=" + tag + ", name=" + name + ", " +
logger.info("Starting docker with img=" + IMG + ", tag=" + tag + ", name=" + name + ", " +
"ports=" + ports + ", volumes=" + volumeDescList + ", env=" + envList + ", cmds=" + cmdList + ", reload=" + reload);
boolean existingContainer = removeExitedContainers(name);
@ -155,7 +155,7 @@ public class DockerHelper {
logger.error("Unable to contact docker, make sure docker is up and try again.");
logger.error("If docker is installed make sure this user has access to the docker group.");
logger.error("$ sudo gpasswd -a ${USER} docker && newgrp docker");
System.exit(1);
throw e;
}
return false;
@ -178,7 +178,7 @@ public class DockerHelper {
logger.error("Unable to contact docker, make sure docker is up and try again.");
logger.error("If docker is installed make sure this user has access to the docker group.");
logger.error("$ sudo gpasswd -a ${USER} docker && newgrp docker");
System.exit(1);
throw e;
}
return false;
}
@ -193,7 +193,7 @@ public class DockerHelper {
} catch (Exception e) {
e.printStackTrace();
logger.error("Unable to contact docker, make sure docker is up and try again.");
System.exit(1);
throw e;
}
if (runningContainers.size() >= 1) {
@ -204,9 +204,10 @@ public class DockerHelper {
if (reload != null) {
try {
post(reload, null, false, "reloading config");
post(reload, null, false, "reload config");
} catch (Exception e) {
logger.error(String.format("Unexpected config/state for docker container %s, consider removing the container", name));
throw e;
}
}
@ -233,6 +234,7 @@ public class DockerHelper {
} catch (InterruptedException e) {
logger.error("Error getting docker log and detect start for containerId: " + containerId);
e.printStackTrace();
throw new RuntimeException(e);
}
}

View File

@ -29,6 +29,7 @@ import static io.nosqlbench.engine.docker.RestHelper.post;
public class DockerMetricsManager {
public static final String GRAFANA_TAG = "grafana_tag";
public static final String PROM_TAG = "prom_tag";
private final DockerHelper dh;
@ -44,7 +45,7 @@ public class DockerMetricsManager {
String ip = startGraphite();
startPrometheus(ip);
startPrometheus(ip, options.get(PROM_TAG));
startGrafana(ip, options.get(GRAFANA_TAG));
@ -71,9 +72,9 @@ public class DockerMetricsManager {
);
List<String> envList = Arrays.asList(
"GF_SECURITY_ADMIN_PASSWORD=admin",
"GF_AUTH_ANONYMOUS_ENABLED=\"true\"",
"GF_SNAPSHOTS_EXTERNAL_SNAPSHOT_URL=https://assethub.datastax.com:3001",
"GF_SNAPSHOTS_EXTERNAL_SNAPSHOT_NAME=\"Upload to DataStax\""
"GF_AUTH_ANONYMOUS_ENABLED=\"true\""
// , "GF_SNAPSHOTS_EXTERNAL_SNAPSHOT_URL=https://assethub.datastax.com:3001",
// "GF_SNAPSHOTS_EXTERNAL_SNAPSHOT_NAME=\"Upload to DataStax\""
);
String reload = null;
@ -93,12 +94,10 @@ public class DockerMetricsManager {
}
}
private void startPrometheus(String ip) {
private void startPrometheus(String ip, String tag) {
logger.info("preparing to start docker metrics");
String PROMETHEUS_IMG = "prom/prometheus";
String tag = "latest";
// String tag = "v2.20.1";
String name = "prom";
List<Integer> port = Arrays.asList(9090);
@ -228,7 +227,7 @@ public class DockerMetricsManager {
logger.error("failed to set permissions on prom backup " +
"directory " + userHome + "/.nosqlbench/prometheus)");
e.printStackTrace();
System.exit(1);
throw new RuntimeException(e);
}
try (PrintWriter out = new PrintWriter(
@ -238,11 +237,11 @@ public class DockerMetricsManager {
} catch (FileNotFoundException e) {
e.printStackTrace();
logger.error("error writing prometheus yaml file to ~/.prometheus");
System.exit(1);
throw new RuntimeException(e);
} catch (IOException e) {
e.printStackTrace();
logger.error("creating file in ~/.prometheus");
System.exit(1);
throw new RuntimeException(e);
}
}
@ -302,7 +301,7 @@ public class DockerMetricsManager {
logger.error("failed to set permissions on grafana directory " +
"directory " + userHome + "/.nosqlbench/grafana)");
e.printStackTrace();
System.exit(1);
throw new RuntimeException(e);
}
}
@ -348,6 +347,7 @@ public class DockerMetricsManager {
close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -28,7 +28,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>docsys</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,7 +22,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -47,7 +47,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -108,8 +108,8 @@ public class ScenarioExecutorEndpoint implements WebServiceObject {
"disabled",
false,
true,
false
);
false,
cmdList.toString());
scenario.addScriptText(buffer.getParsedScript());

View File

@ -3,7 +3,7 @@
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
@ -11,10 +11,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<javadoc.name>nosqlbench</javadoc.name>
<java.target.version>11</java.target.version>
<!-- properties for package versions -->
<antlr4.version>4.8</antlr4.version>
<ascii.data.version>1.2.0</ascii.data.version>
<commons.codec.version>1.14</commons.codec.version>
<commons.compress.version>1.20</commons.compress.version>
@ -323,7 +320,7 @@
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${antlr4.version}</version>
<version>4.8</version>
</dependency>
<dependency>
@ -504,8 +501,8 @@
<configuration>
<debug>true</debug>
<!-- <release>${java.target.version}</release>-->
<source>11</source>
<target>11</target>
<source>15</source>
<target>15</target>
<!-- <compilerArgs>-->
<!-- &#45;&#45;enable-preview-->
<!-- </compilerArgs>-->

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -31,7 +31,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -1,7 +1,5 @@
package io.nosqlbench.nb.api.annotations;
import io.nosqlbench.nb.api.Layer;
import java.util.Map;
/**
@ -73,4 +71,12 @@ public interface Annotation {
return new AnnotationBuilder();
}
/**
* This should return {@link Span#interval} if the span of time is not an instant, and
* {@link Span#instant}, otherwise.
*/
Span getSpan();
String asJson();
}

View File

@ -1,8 +1,8 @@
package io.nosqlbench.nb.api.annotations;
import io.nosqlbench.nb.api.Layer;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.TimeZone;
public class AnnotationBuilder implements BuilderFacets.All {
private String session;
@ -11,6 +11,7 @@ public class AnnotationBuilder implements BuilderFacets.All {
private final LinkedHashMap<String, String> labels = new LinkedHashMap<>();
private final LinkedHashMap<String, String> details = new LinkedHashMap<>();
private Layer layer;
private final TimeZone timezone = TimeZone.getTimeZone(ZoneId.of("GMT"));
@Override
public AnnotationBuilder layer(Layer layer) {
@ -50,6 +51,7 @@ public class AnnotationBuilder implements BuilderFacets.All {
return this;
}
@Override
public AnnotationBuilder label(String name, String value) {
this.labels.put(name, value);
@ -64,7 +66,7 @@ public class AnnotationBuilder implements BuilderFacets.All {
@Override
public Annotation build() {
return new MutableAnnotation(session, layer, start, end, labels, details).asReadOnly();
return new MutableAnnotation(timezone, session, layer, start, end, labels, details).asReadOnly();
}

View File

@ -1,7 +1,5 @@
package io.nosqlbench.nb.api.annotations;
import io.nosqlbench.nb.api.Layer;
public interface BuilderFacets {
interface All extends

View File

@ -1,4 +1,4 @@
package io.nosqlbench.nb.api;
package io.nosqlbench.nb.api.annotations;
public enum Layer {
@ -10,13 +10,13 @@ public enum Layer {
/**
* Events which describe scenario execution, such as parameters,
* lifecycle events, and critical errors
* lifecycle events, interruptions, and critical errors
*/
Scenario,
/**
* Events which describe scripting details, such as extensions,
* sending programmatic annotations, or critical errors
* Events which describe scripting details, such as commands,
* extension usages, sending programmatic annotations, or critical errors
*/
Script,

View File

@ -1,40 +1,66 @@
package io.nosqlbench.nb.api.annotations;
import io.nosqlbench.nb.api.Layer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
public class MutableAnnotation implements Annotation {
private final static Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
private String session = "SESSION_UNNAMED";
@Expose
private Layer layer;
@Expose
private long start = 0L;
@Expose
private long end = 0L;
@Expose
private Map<String, String> labels = new LinkedHashMap<>();
@Expose
private Map<String, String> details = new LinkedHashMap<>();
public MutableAnnotation(String session, Layer layer, long start, long end, LinkedHashMap<String, String> labels,
LinkedHashMap<String, String> details) {
this.session = session;
this.layer = layer;
this.start = start;
this.end = end;
this.details = details;
this.labels = labels;
private final ZoneId zoneid = ZoneId.of("GMT");
public MutableAnnotation(
TimeZone timezone,
String session,
Layer layer,
long start,
long end,
LinkedHashMap<String, String> labels,
LinkedHashMap<String, String> details) {
setLabels(labels);
setSession(session);
setLayer(layer);
setStart(start);
setEnd(end);
setDetails(details);
labels.put("appname", "nosqlbench");
}
public void setSession(String sessionName) {
this.session = sessionName;
this.labels.put("session", sessionName);
}
public void setStart(long intervalStart) {
this.start = intervalStart;
this.labels.put("span", getSpan().toString());
}
public void setEnd(long intervalEnd) {
this.end = intervalEnd;
this.labels.put("span", getSpan().toString());
}
public void setLabels(Map<String, String> labels) {
@ -72,6 +98,9 @@ public class MutableAnnotation implements Annotation {
@Override
public Map<String, String> getLabels() {
// if (!labels.containsKey("span")) {
// labels.put("span",getSpan().toString());
// }
return labels;
}
@ -85,11 +114,17 @@ public class MutableAnnotation implements Annotation {
StringBuilder sb = new StringBuilder();
sb.append("session: ").append(getSession()).append("\n");
sb.append("[").append(new Date(getStart()));
ZonedDateTime startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(getStart()), zoneid);
ZonedDateTime endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(getStart()), zoneid);
sb.append("[").append(startTime);
if (getStart() != getEnd()) {
sb.append(" - ").append(new Date(getEnd()));
sb.append(" - ").append(endTime);
}
sb.append("]\n");
sb.append("span:").append(getSpan()).append("\n");
sb.append("details:\n");
formatMap(sb, getDetails());
sb.append("labels:\n");
@ -114,8 +149,16 @@ public class MutableAnnotation implements Annotation {
});
}
public Annotation asReadOnly() {
return this;
}
public Span getSpan() {
return (getStart() == getEnd()) ? Span.instant : Span.interval;
}
public String asJson() {
String inlineForm = gson.toJson(this);
return inlineForm;
}
}

View File

@ -0,0 +1,12 @@
package io.nosqlbench.nb.api.annotations;
public enum Span {
/**
* A span of time of size zero.
*/
instant,
/**
* A span in time for which the start and end are different.
*/
interval
}

View File

@ -0,0 +1,7 @@
package io.nosqlbench.nb.api.labels;
import java.util.Map;
public interface Labeled {
Map<String, String> getLabels();
}

View File

@ -0,0 +1,19 @@
package io.nosqlbench.nb.api.labels;
import java.util.HashMap;
import java.util.Map;
public class MutableLabels extends HashMap<String, String> implements Labeled {
public static MutableLabels fromMaps(Map<String, String> entries) {
MutableLabels mutableLabels = new MutableLabels();
mutableLabels.putAll(entries);
return mutableLabels;
}
@Override
public Map<String, String> getLabels() {
return this;
}
}

View File

@ -1,41 +1,48 @@
package io.nosqlbench.nb.api.annotations;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class AnnotationBuilderTest {
private static final long time = 1600000000000L;
// Disabled until the feature is properly merged into main
// @Test
// public void testBasicAnnotation() {
//
// Annotation an1 = Annotation.newBuilder()
// .session("test-session")
// .at(time)
// .layer(Layer.Scenario)
// .label("labelka", "labelvb")
// .label("labelkc", "labelvd")
// .detail("detailk1", "detailv1")
// .detail("detailk2", "detailv21\ndetailv22")
// .detail("detailk3", "v1\nv2\nv3\n")
// .build();
//
// String represented = an1.toString();
// assertThat(represented).isEqualTo("session: test-session\n" +
// "[Sun Sep 13 07:26:40 CDT 2020]\n" +
// "details:\n" +
// " detailk1: detailv1\n" +
// " detailk2: \n" +
// " detailv21\n" +
// " detailv22\n" +
// " detailk3: \n" +
// " v1\n" +
// " v2\n" +
// " v3\n" +
// "labels:\n" +
// " layer: Scenario\n" +
// " labelka: labelvb\n" +
// " labelkc: labelvd\n");
//
// }
@Test
public void testBasicAnnotation() {
Annotation an1 = Annotation.newBuilder()
.session("test-session")
.at(time)
.layer(Layer.Scenario)
.label("labelka", "labelvb")
.label("labelkc", "labelvd")
.detail("detailk1", "detailv1")
.detail("detailk2", "detailv21\ndetailv22")
.detail("detailk3", "v1\nv2\nv3\n")
.build();
String represented = an1.toString();
assertThat(represented).isEqualTo("session: test-session\n" +
"[2020-09-13T12:26:40Z[GMT]]\n" +
"span:instant\n" +
"details:\n" +
" detailk1: detailv1\n" +
" detailk2: \n" +
" detailv21\n" +
" detailv22\n" +
" detailk3: \n" +
" v1\n" +
" v2\n" +
" v3\n" +
"labels:\n" +
" layer: Scenario\n" +
" labelka: labelvb\n" +
" labelkc: labelvd\n" +
" session: test-session\n" +
" span: instant\n" +
" appname: nosqlbench\n");
}
}

View File

@ -29,7 +29,7 @@ fi
rsync -av appimage/skel/ "${APPDIR}/"
cp target/nb.jar "${APPDIR}/usr/bin/nb.jar"
JAVA_VERSION="14"
JAVA_VERSION="15"
mkdir -p "${APPDIR}/usr/bin/jre"

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -24,31 +24,31 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-rest</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-docs</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-core</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-extensions</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
@ -60,67 +60,67 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-web</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-kafka</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-diag</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-tcp</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-http</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-jmx</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-dsegraph-shaded</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cql-shaded</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cqlverify</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-mongodb</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
@ -244,7 +244,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cqld4</artifactId>
<version>3.12.157-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>
@ -257,7 +257,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-mongodb</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>mvn-defaults</relativePath>
</parent>

View File

@ -2,7 +2,7 @@
set -e
#RELEASE_NOTES_FILE=${RELEASE_NOTES_FILE:?RELEASE_NOTES_FILE must be provided}
git log --oneline --decorate --max-count=1000 main > /tmp/gitlog_main
git log --oneline --decorate --max-count=1000 nb4-rc1 > /tmp/gitlog_main
readarray lines < /tmp/gitlog_main
for line in "${lines[@]}"

View File

@ -7,7 +7,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,14 +23,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<artifactId>nb-api</artifactId>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lang</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,7 +20,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,13 +20,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,7 +20,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -18,7 +18,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -18,36 +18,36 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-realdata</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-realer</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-random</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<artifactId>virtdata-lib-basics</artifactId>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
<artifactId>virtdata-lib-curves4</artifactId>
</dependency>
@ -55,7 +55,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>docsys</artifactId>
<version>3.12.160-SNAPSHOT</version>
<version>4.15.5-SNAPSHOT</version>
</dependency>
</dependencies>