diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index a10f5ff2f..ad11d6740 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -16,7 +16,7 @@ jobs:
- name: setup java
uses: actions/setup-java@v1
with:
- java-version: '14'
+ java-version: '15'
java-package: jdk
architecture: x64
diff --git a/Dockerfile b/Dockerfile
index 80d12e4f5..9d384545d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,4 @@
-FROM openjdk:14-alpine
+FROM openjdk:15-alpine
RUN apk --no-cache add curl
COPY nb/target/nb.jar nb.jar
diff --git a/RELEASENOTES.md b/RELEASENOTES.md
index e69de29bb..4adc2f46d 100644
--- a/RELEASENOTES.md
+++ b/RELEASENOTES.md
@@ -0,0 +1,8 @@
+- 526d09cd (HEAD -> nb4-rc1) auto create dirs for grafana_apikey
+- b4ec4c9a (origin/nb4-rc1) trigger build
+- af87ef9c relaxed requirement for finicky test
+- 3436ec61 trigger build
+- 17ed4c1e annotator and dashboard fixes
+- 4dab9b89 move annotations enums to package
+- 6d514cb6 bump middle version number to required java version '15'
+- fa78e27f set NB4 to Java 15
diff --git a/devdocs/devguide/hybrid_ratelimiter.md b/devdocs/devguide/hybrid_ratelimiter.md
new file mode 100644
index 000000000..1da432cb6
--- /dev/null
+++ b/devdocs/devguide/hybrid_ratelimiter.md
@@ -0,0 +1,244 @@
+## RateLimiter Design
+
+The nosqlbench rate limiter is a hybrid design, combining ideas from
+well-known algorithms with a heavy dose of mechanical sympathy. The
+resulting implementation provides the following:
+
+1. A basic design that can be explained in one page (this page!)
+2. High throughput, compared to other rate limiters tested.
+3. Graceful degradation with increasing concurrency.
+4. Clearly defined behavioral semantics.
+5. Efficient burst capability, for tunable catch-up rates.
+6. Efficient calculation of wait time.
+
+## Parameters
+
+**rate** - In simplest terms, users simply need to configure the *rate*.
+For example, `rate=12000` specifies an op rate of 12000 ops/second.
+
+**burst rate** - Additionally, users may specify a burst rate which can be
+used to recover unused time when a client is able to go faster than the
+strict limit. The burst rate is multiplied by the _op rate_ to arrive at
+the maximum rate when wait time is available to recover. For
+example, `rate=12000,1.1`
+specifies that a client may operate at 12000 ops/s _when it is caught up_,
+while allowing it to go at a rate of up to 13200 ops/s _when it is behind
+schedule_.
+
+## Design Principles
+
+The core design of the rate limiter is based on
+the [token bucket](https://en.wikipedia.org/wiki/Token_bucket) algorithm
+as established in the telecom industry for rate metering. Additional
+refinements have been added to allow for flexible and reliable use on
+non-realtime systems.
+
+The unit of scheduling used in this design is the token, corresponding
+directly to a nanosecond of time. The scheduling time that is made
+available to callers is stored in a pool of tokens which is set to a
+configured size. The size of the token pool determines how many grants are
+allowed to be dispatched before the next one is forced to wait for
+available tokens.
+
+At some regular frequency, a filler thread adds tokens (nanoseconds of
+time to be distributed to waiting ops) to the pool. The callers which are
+waiting for these tokens consume a number of tokens serially. If the pool
+does not contain the requested number of tokens, then the caller is
+blocked using basic synchronization primitives. When the pool is filled
+any blocked callers are unblocked.
+
+The hybrid rate limiter tracks and accumulates both the passage of system
+time and the usage rate of this time as a measurement of progress. The
+delta between these two reference points in time captures a very simple
+and empirical value of imposed wait time.
+
+That is, the time which was allocated but which was not used always
+represents a slow down which is imposed by external factors. This
+manifests as slower response when considering the target rate to be
+equivalent to user load.
+
+## Design Details
+
+In fact, there are three pools. The _active_ pool, the _bursting_ pool,
+and the
+_waiting_ pool. The active pool has a limited size based on the number of
+operations that are allowed to be granted concurrently.
+
+The bursting pool is sized according to the relative burst rate and the
+size of the active pool. For example, with an op rate of 1000 ops/s and a
+burst rate of 1.1, the active pool can be sized to 1E9 nanos (one second
+of nanos), and the burst pool can be sized to 1E8 (1/10 of that), thus
+yielding a combined pool size of 1E9 + 1E8, or 1100000000 ns.
+
+The waiting pool is where all extra tokens are held in reserve. It is
+unlimited except by the size of a long value. The size of the waiting pool
+is a direct measure of wait time in nanoseconds.
+
+Within the pools, tokens (time) are neither created nor destroyed. They
+are added by the filler based on the passage of time, and consumed by
+callers when they become available. In between these operations, the net
+sum of tokens is preserved. In short, when time deltas are observed in the
+system clock, this time is accumulated into the available scheduling time
+of the token pools. In this way, the token pool acts as a metered
+dispenser of scheduling time to waiting (or not) consumers.
+
+The filler thread adds tokens to the pool according to the system
+real-time clock, at some estimated but unreliable interval. The frequency
+of filling is set high enough to give a reliable perception of time
+passing smoothly, but low enough to avoid wasting too much thread time in
+calling overhead. (It is set to 1K/s by default). Each time filling
+occurs, the real-time clock is check-pointed, and the time delta is fed
+into the pool filling logic as explained below.
+
+## Visual Explanation
+
+The diagram below explains the moving parts of the hybrid rate limiter.
+The arrows represent the flow of tokens (ns) as a form of scheduling
+currency.
+
+The top box shows an active token filler thread which polls the system
+clock and accumulates new time into the token pool.
+
+The bottom boxes represent concurrent readers of the token pool. These are
+typically independent threads which do a blocking read for tokens once
+they are ready to execute the rate-limited task.
+
+
+
+In the middle, the passive component in this diagram is the token pool
+itself. When the token filler adds tokens, it never blocks. However, the
+token filler can cause any readers of the token pool to unblock so that
+they can acquire newly available tokens.
+
+When time is added to the token pool, the following steps are taken:
+
+1) New tokens (based on measured time elapsed since the last fill) are
+ added to the active pool until it is full.
+2) Any extra tokens are added to the waiting pool.
+3) If the waiting pool has any tokens, and there is room in the bursting
+ pool, some tokens are moved from the waiting pool to the bursting pool
+ according to how many will fit.
+
+When a caller asks for a number of tokens, the combined total from the
+active and burst pools is available to that caller. If the number of
+tokens needed is not yet available, then the caller will block until
+tokens are added.
+
+## Bursting Logic
+
+Tokens in the waiting pool represent time that has not been claimed by a
+caller. Tokens accumulate in the waiting pool as a side-effect of
+continuous filling outpacing continuous draining, thus creating a backlog
+of operations.
+
+The pool sizes determine both the maximum instantaneously available
+operations as well as the rate at which unclaimed time can be back-filled
+back into the active or burst pools.
+
+### Normalizing for Jitter
+
+Since it is not possible to schedule the filler thread to trigger on a
+strict and reliable schedule (as in a real-time system), the method of
+moving tokens from the waiting pool to the bursting pool must account for
+differences in timing. Thus, tokens which are activated for bursting are
+scaled according to the amount of time added in the last fill, relative to
+the maximum active pool. This means that a full pool fill will allow a
+full burst pool fill, presuming wait time is positive by that amount. It
+also means that the same effect can be achieved by ten consecutive fills
+of a tenth the time each. In effect, bursting is normalized to the passage
+of time along with the burst rate, with a maximum cap imposed when
+operations are unclaimed by callers.
+
+## Mechanical Trade-offs
+
+In this implementation, it is relatively easy to explain how accuracy and
+performance trade-off. They are competing concerns. Consider these two
+extremes of an isochronous configuration:
+
+### Slow Isochronous
+
+For example, the rate limiter could be configured for strict isochronous
+behavior by setting the active pool size to *one* op of nanos and the
+burst rate to 1.0, thus disabling bursting. If the op rate requested is 1
+op/s, this configuration will work relatively well, although *any* caller
+which doesn't show up (or isn't already waiting) when the tokens become
+available will incur a waittime penalty. The odds of this are relatively
+low for a high-velocity client.
+
+### Fast Isochronous
+
+However, if the op rate for this type of configuration is set to 1E8
+operations per second, then the filler thread will be adding 100 ops worth
+of time when there is only *one* op worth of active pool space. This is
+due to the fact that filling can only occur at a maximal frequency which
+has been set to 1K fills/s on average. That will create artificial wait
+time, since the token consumers and producers would not have enough pool
+space to hold the tokens needed during fill. It is not possible on most
+systems to fill the pool at arbitrarily high fill frequencies. Thus, it is
+important for users to understand the limits of the machinery when using
+high rates. In most scenarios, these limits will not be onerous.
+
+### Boundary Rules
+
+Taking these effects into account, the default configuration makes some
+reasonable trade-offs according to the rules below. These rules should
+work well for most rates below 50M ops/s. The net effect of these rules is
+to increase work bulking within the token pools as rates go higher.
+
+Trying to go above 50M ops/s while also forcing isochronous behavior will
+result in artificial wait-time. For this reason, the pool size itself is
+not user-configurable at this time.
+
+- The pool size will always be at least as big as two ops. This rule
+ ensures that there is adequate buffer space for tokens when callers are
+ accessing the token pools near the rate of the filler thread. If this
+ were not ensured, then artificial wait time would be injected due to
+ overflow error.
+- The pool size will always be at least as big as 1E6 nanos, or 1/1000 of
+ a second. This rule ensures that the filler thread has a reasonably
+ attainable update frequency which will prevent underflow in the active
+ or burst pools.
+- The number of ops that can fit in the pool will determine how many ops
+ can be dispatched between fills. For example, an op rate of 1E6 will
+ mean that up to 1000 ops worth of tokens may be present between fills,
+ and up to 1000 ops may be allowed to start at any time before the next
+ fill.
+
+ .1 ops/s : .2 seconds worth 1 ops/s : 2 seconds worth 100 ops/s : 2
+ seconds worth
+
+In practical terms, this means that rates slower than 1K ops/S will have
+their strictness controlled by the burst rate in general, and rates faster
+than 1K ops/S will automatically include some op bulking between fills.
+
+## History
+
+A CAS-oriented method which compensated for RTC calling overhead was used
+previously. This method afforded very high performance, but it was
+difficult to reason about.
+
+This implementation replaces that previous version. Basic synchronization
+primitives (implicit locking via synchronized methods) performed
+surprisingly well -- well enough to discard the complexity of the previous
+implementation.
+
+Further, this version is much easier to study and reason about.
+
+## New Challenges
+
+While the current implementation works well for most basic cases, high CPU
+contention has shown that it can become an artificial bottleneck. Based on
+observations on higher end systems with many cores running many threads
+and high target rates, it appears that the rate limiter becomes a resource
+blocker or forces too much thread management.
+
+Strategies for handling this should be considered:
+
+1) Make callers able to pseudo-randomly (or not randomly) act as a token
+ filler, such that active consumers can do some work stealing from the
+ original token filler thread.
+2) Analyze the timing and history of a high-contention scenario for
+ weaknesses in the parameter adjustment rules above.
+3) Add internal micro-batching at the consumer interface, such that
+ contention cost is lower in general.
+4) Partition the rate limiter into multiple slices.
diff --git a/devdocs/devguide/hybrid_ratelimiter.png b/devdocs/devguide/hybrid_ratelimiter.png
new file mode 100644
index 000000000..68107623c
Binary files /dev/null and b/devdocs/devguide/hybrid_ratelimiter.png differ
diff --git a/devdocs/devguide/hybrid_ratelimiter.svg b/devdocs/devguide/hybrid_ratelimiter.svg
new file mode 100644
index 000000000..f3b9e05fc
--- /dev/null
+++ b/devdocs/devguide/hybrid_ratelimiter.svg
@@ -0,0 +1,2186 @@
+
+
diff --git a/docsys/pom.xml b/docsys/pom.xml
index 1d8744a79..281fb1882 100644
--- a/docsys/pom.xml
+++ b/docsys/pom.xml
@@ -9,7 +9,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -18,7 +18,7 @@
io.nosqlbenchnb-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
@@ -98,7 +98,7 @@
io.nosqlbenchvirtdata-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-cql-shaded/pom.xml b/driver-cql-shaded/pom.xml
index 455a6769e..e1680a6aa 100644
--- a/driver-cql-shaded/pom.xml
+++ b/driver-cql-shaded/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbenchmvn-defaults
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CqlActivityType.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CqlActivityType.java
index 7572aad8e..2b5421bd5 100644
--- a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CqlActivityType.java
+++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CqlActivityType.java
@@ -34,11 +34,6 @@ public class CqlActivityType implements ActivityType {
throw new RuntimeException("Currently, the cql activity type requires yaml/workload activity parameter.");
}
- // allow shortcut: yaml parameter provide the default alias name
- if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
- activityDef.getParams().set("alias",yaml.get());
- }
-
return new CqlActivity(activityDef);
}
diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivityType.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivityType.java
index 28d117e52..290be30b8 100644
--- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivityType.java
+++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivityType.java
@@ -3,7 +3,6 @@ package io.nosqlbench.activitytype.cqld4.core;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.type.UserDefinedType;
-import io.nosqlbench.activitytype.cqld4.codecsupport.UDTJavaType;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
@@ -35,11 +34,6 @@ public class CqlActivityType implements ActivityType {
throw new RuntimeException("Currently, the cql activity type requires yaml/workload activity parameter.");
}
- // allow shortcut: yaml parameter provide the default alias name
- if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
- activityDef.getParams().set("alias",yaml.get());
- }
-
return new CqlActivity(activityDef);
}
diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java
index f00f6f2f2..06dbf2f1c 100644
--- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java
+++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java
@@ -79,7 +79,7 @@ public class CqlAsyncAction extends BaseAsyncAction {
@Override
public void startOpCycle(TrackedOp opc) {
- CqlOpData cqlop = opc.getData();
+ CqlOpData cqlop = opc.getOpData();
long cycle = opc.getCycle();
// bind timer covers all statement selection and binding, skipping, transforming logic
@@ -123,7 +123,7 @@ public class CqlAsyncAction extends BaseAsyncAction {
public void onSuccess(StartedOp sop, AsyncResultSet resultSet) {
- CqlOpData cqlop = sop.getData();
+ CqlOpData cqlop = sop.getOpData();
HashedCQLErrorHandler.resetThreadStatusCode();
if (cqlop.skipped) {
@@ -218,7 +218,7 @@ public class CqlAsyncAction extends BaseAsyncAction {
public void onFailure(StartedOp startedOp) {
- CqlOpData cqlop = startedOp.getData();
+ CqlOpData cqlop = startedOp.getOpData();
long serviceTime = startedOp.getCurrentServiceTimeNanos();
// Even if this is retryable, we expose error events
diff --git a/driver-cqlverify/pom.xml b/driver-cqlverify/pom.xml
index 0c6827e29..8fa57456b 100644
--- a/driver-cqlverify/pom.xml
+++ b/driver-cqlverify/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbenchmvn-defaults
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -24,13 +24,13 @@
io.nosqlbenchdriver-cql-shaded
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-diag/pom.xml b/driver-diag/pom.xml
index c251fb3ea..593ddcde3 100644
--- a/driver-diag/pom.xml
+++ b/driver-diag/pom.xml
@@ -5,7 +5,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -20,13 +20,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-dsegraph-shaded/pom.xml b/driver-dsegraph-shaded/pom.xml
index 1737368c2..b445ca375 100644
--- a/driver-dsegraph-shaded/pom.xml
+++ b/driver-dsegraph-shaded/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbenchmvn-defaults
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-http/pom.xml b/driver-http/pom.xml
index 21500ad1a..070b676ae 100644
--- a/driver-http/pom.xml
+++ b/driver-http/pom.xml
@@ -5,7 +5,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-http/src/main/java/io/nosqlbench/activitytype/http/HttpAction.java b/driver-http/src/main/java/io/nosqlbench/activitytype/http/HttpAction.java
index b97425c47..9c7464551 100644
--- a/driver-http/src/main/java/io/nosqlbench/activitytype/http/HttpAction.java
+++ b/driver-http/src/main/java/io/nosqlbench/activitytype/http/HttpAction.java
@@ -7,11 +7,10 @@ import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.errors.BasicError;
-import io.nosqlbench.virtdata.core.templates.StringBindings;
-import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
-import java.io.*;
+import java.io.FileNotFoundException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
@@ -28,14 +27,11 @@ public class HttpAction implements SyncAction {
private final HttpActivity httpActivity;
private final int slot;
private final int maxTries = 1;
- private boolean showstmts;
private OpSequence sequencer;
private HttpClient client;
private final HttpResponse.BodyHandler bodyreader = HttpResponse.BodyHandlers.ofString();
- private final long timeoutMillis=30000L;
-
public HttpAction(ActivityDef activityDef, int slot, HttpActivity httpActivity) {
this.slot = slot;
@@ -54,25 +50,15 @@ public class HttpAction implements SyncAction {
@Override
public int runCycle(long cycleValue) {
- StringBindings stringBindings;
- String statement = null;
- InputStream result = null;
+
+ // The request to be used must be constructed from the template each time.
+ HttpOp httpOp = null;
// The bind timer captures all the time involved in preparing the
// operation for execution, including data generation as well as
// op construction
-
- // The request to be used must be constructed from the template each time.
- HttpOp httpOp=null;
-
- // A specifier for what makes a response ok. If this is provided, then it is
- // either a list of valid http status codes, or if non-numeric, a regex for the body
- // which must match.
- // If not provided, then status code 200 is the only thing required to be matched.
- String ok;
-
try (Timer.Context bindTime = httpActivity.bindTimer.time()) {
- ReadyHttpOp readHTTPOperation = httpActivity.getSequencer().get(cycleValue);
+ ReadyHttpOp readHTTPOperation = sequencer.get(cycleValue);
httpOp = readHTTPOperation.apply(cycleValue);
} catch (Exception e) {
if (httpActivity.isDiagnosticMode()) {
@@ -135,98 +121,10 @@ public class HttpAction implements SyncAction {
}
}
-
-// if (ok == null) {
-// if (response.statusCode() != 200) {
-// throw new ResponseError("Result had status code " +
-// response.statusCode() + ", but 'ok' was not set for this statement," +
-// "so it is considered an error.");
-// }
-// } else {
-// String[] oks = ok.split(",");
-// for (String ok_condition : oks) {
-// if (ok_condition.charAt(0)>='0' && ok_condition.charAt(0)<='9') {
-// int matching_status = Integer.parseInt(ok_condition);
-// } else {
-// Pattern successRegex = Pattern.compile(ok);
-// }
-// }
-//// Matcher matcher = successRegex.matcher(String.valueOf(response.statusCode()));
-//// if (!matcher.matches()) {
-//// throw new BasicError("status code " + response.statusCode() + " did not match " + success);
-//// }
-// }
}
return 0;
}
-// String body = future.body();
-
-
-// String[] splitStatement = statement.split("\\?");
-// String path, query;
-//
-// path = splitStatement[0];
-// query = "";
-//
-// if (splitStatement.length >= 2) {
-// query = splitStatement[1];
-// }
-//
-// URI uri = new URI(
-// "http",
-// null,
-// host,
-// httpActivity.getPort(),
-// path,
-// query,
-// null);
-//
-// statement = uri.toString();
-//
-// showstmts = httpActivity.getShowstmts();
-
-// if (showstmts) {
-// logger.info("STMT(cycle=" + cycleValue + "):\n" + statement);
-// }
-// } catch (URISyntaxException e) {
-// e.printStackTrace();
-// }
-//
-// long nanoStartTime=System.nanoTime();
-//
-
-// Timer.Context resultTime = httpActivity.resultTimer.time();
-// try {
-// StringBuilder res = new StringBuilder();
-//
-// BufferedReader rd = new BufferedReader(new InputStreamReader(result));
-// String line;
-// while ((line = rd.readLine()) != null) {
-// res.append(line);
-// }
-// rd.close();
-//
-// } catch (Exception e) {
-// long resultNanos = resultTime.stop();
-// resultTime = null;
-// } finally {
-// if (resultTime != null) {
-// resultTime.stop();
-// }
-//
-// }
-//
-// }
-// long resultNanos = System.nanoTime() - nanoStartTime;
-// httpActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
-
-
-// protected HttpActivity getHttpActivity () {
-// return httpActivity;
-// }
-// }
-
private HttpRequest.BodyPublisher bodySourceFrom(Map cmdMap) {
if (cmdMap.containsKey("body")) {
String body = cmdMap.remove("body");
diff --git a/driver-jmx/pom.xml b/driver-jmx/pom.xml
index efd475d99..a8c0bb1b1 100644
--- a/driver-jmx/pom.xml
+++ b/driver-jmx/pom.xml
@@ -5,7 +5,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-kafka/pom.xml b/driver-kafka/pom.xml
index 2790b180e..4a330b5a0 100644
--- a/driver-kafka/pom.xml
+++ b/driver-kafka/pom.xml
@@ -4,7 +4,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -44,13 +44,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdriver-stdout
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-mongodb/pom.xml b/driver-mongodb/pom.xml
index c22d56037..b8ec87e7f 100644
--- a/driver-mongodb/pom.xml
+++ b/driver-mongodb/pom.xml
@@ -7,7 +7,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -21,13 +21,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-stdout/pom.xml b/driver-stdout/pom.xml
index 7c112009f..d4745c9a9 100644
--- a/driver-stdout/pom.xml
+++ b/driver-stdout/pom.xml
@@ -7,7 +7,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivityType.java b/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivityType.java
index 6de74b7cd..0399bacf2 100644
--- a/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivityType.java
+++ b/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivityType.java
@@ -27,11 +27,6 @@ public class StdoutActivityType implements ActivityType {
".");
}
- // allow shortcut: yaml parameter provide the default alias name
- if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
- activityDef.getParams().set("alias",yaml.get());
- }
-
return new StdoutActivity(activityDef);
}
diff --git a/driver-tcp/pom.xml b/driver-tcp/pom.xml
index 43a4349ff..0c0282353 100644
--- a/driver-tcp/pom.xml
+++ b/driver-tcp/pom.xml
@@ -7,7 +7,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -24,19 +24,19 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdriver-stdout
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/driver-web/pom.xml b/driver-web/pom.xml
index bdcf5f5ac..2ff90a4a1 100644
--- a/driver-web/pom.xml
+++ b/driver-web/pom.xml
@@ -7,7 +7,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/drivers-api/pom.xml b/drivers-api/pom.xml
index 0a7a6fce0..c295faa1a 100644
--- a/drivers-api/pom.xml
+++ b/drivers-api/pom.xml
@@ -5,7 +5,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbenchnb-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchvirtdata-userlibs
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/engine-api/pom.xml b/engine-api/pom.xml
index 4ef4e27aa..fa7c94a4d 100644
--- a/engine-api/pom.xml
+++ b/engine-api/pom.xml
@@ -5,7 +5,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -23,25 +23,25 @@
io.nosqlbenchnb-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchdrivers-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchnb-annotations
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchvirtdata-userlibs
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/HybridRateLimiter.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/HybridRateLimiter.java
index b97cf281c..e72b533e3 100644
--- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/HybridRateLimiter.java
+++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/HybridRateLimiter.java
@@ -21,8 +21,9 @@ import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.api.activityapi.core.Startable;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
-import org.apache.logging.log4j.Logger;
+import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.concurrent.atomic.AtomicLong;
@@ -76,11 +77,12 @@ import java.util.concurrent.atomic.AtomicLong;
* overall workload is not saturating resources.
*
*/
+@Service(value = RateLimiter.class, selector = "hybrid")
public class HybridRateLimiter implements Startable, RateLimiter {
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
- private volatile TokenFiller filler;
+ //private volatile TokenFiller filler;
private volatile long starttime;
// rate controls
@@ -150,8 +152,9 @@ public class HybridRateLimiter implements Startable, RateLimiter {
}
this.rateSpec = updatingRateSpec;
- this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
- this.tokens = this.filler.getTokenPool();
+ this.tokens = (this.tokens == null) ? new ThreadDrivenTokenPool(rateSpec, activityDef) : this.tokens.apply(rateSpec);
+// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
+// this.tokens = this.filler.getTokenPool();
if (this.state == State.Idle && updatingRateSpec.isAutoStart()) {
this.start();
@@ -177,7 +180,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
case Idle:
long nanos = getNanoClockTime();
this.starttime = nanos;
- this.filler.start();
+ this.tokens.start();
state = State.Started;
break;
}
@@ -191,7 +194,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
case Started:
long accumulatedWaitSinceLastStart = cumulativeWaitTimeNanos.get();
cumulativeWaitTimeNanos.set(0L);
- return this.filler.restart() + accumulatedWaitSinceLastStart;
+ return this.tokens.restart() + accumulatedWaitSinceLastStart;
default:
return 0L;
}
@@ -215,14 +218,14 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName());
- if (this.getRateSpec()!=null) {
+ if (this.getRateSpec() != null) {
sb.append(" spec=").append(this.getRateSpec().toString());
}
- if (this.state!=null) {
+ if (this.state != null) {
sb.append(" state=").append(this.state);
}
- if (this.filler !=null) {
- sb.append(" filler=").append(this.filler.toString());
+ if (this.tokens != null) {
+ sb.append(" tokens=").append(this.tokens.toString());
}
return sb.toString();
}
@@ -245,7 +248,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override
public Long getValue() {
- TokenPool pool = rl.filler.getTokenPool();
+ TokenPool pool = rl.tokens;
if (pool==null) {
return 0L;
}
diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/ThreadDrivenTokenPool.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/ThreadDrivenTokenPool.java
new file mode 100644
index 000000000..2605d1126
--- /dev/null
+++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/ThreadDrivenTokenPool.java
@@ -0,0 +1,269 @@
+/*
+ *
+ * Copyright 2016 jshook
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package io.nosqlbench.engine.api.activityapi.ratelimits;
+
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.nb.annotations.Service;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import static io.nosqlbench.engine.api.util.Colors.*;
+
+/**
+ *
Synopsis
+ *
+ * This TokenPool represents a finite quantity which can be
+ * replenished with regular refills. Extra tokens that do not fit
+ * within the active token pool are saved in a waiting token pool and
+ * used to backfill when allowed according to the backfill rate.
+ *
+ * A detailed explanation for how this works will be included
+ * at @link "http://docs.nosqlbench.io/" under dev notes.
+ *
+ *
This is the basis for the token-based rate limiters in
+ * NB. This mechanism is easily adaptable to bursting
+ * capability as well as a degree of stricter timing at speed.
+ * Various methods for doing this in a lock free way were
+ * investigated, but the intrinsic locks provided by synchronized
+ * method won out for now. This may be revisited when EB is
+ * retrofitted for J11.
+ *
+ */
+@Service(value = TokenPool.class, selector = "threaded")
+public class ThreadDrivenTokenPool implements TokenPool {
+
+ private final static Logger logger = LogManager.getLogger(ThreadDrivenTokenPool.class);
+
+ public static final double MIN_CONCURRENT_OPS = 2;
+
+ private long maxActivePool;
+ private long burstPoolSize;
+ private long maxOverActivePool;
+ private double burstRatio;
+ // TODO Consider removing volatile after investigating
+ private volatile long activePool;
+ private volatile long waitingPool;
+ private RateSpec rateSpec;
+ private long nanosPerOp;
+ // private long debugTrigger=0L;
+// private long debugRate=1000000000;
+ private long blocks = 0L;
+
+ private TokenFiller filler;
+ private final ActivityDef activityDef;
+
+ /**
+ * This constructor tries to pick reasonable defaults for the token pool for
+ * a given rate spec. The active pool must be large enough to contain one
+ * op worth of time, and the burst ratio
+ *
+ * @param rateSpec a {@link RateSpec}
+ */
+ public ThreadDrivenTokenPool(RateSpec rateSpec, ActivityDef activityDef) {
+ this.activityDef = activityDef;
+ apply(rateSpec);
+ logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
+// filler.start();
+ }
+
+ /**
+ * Change the settings of this token pool, and wake any blocked callers
+ * just in case it allows them to proceed.
+ *
+ * @param rateSpec The rate specifier.
+ */
+ @Override
+ public synchronized TokenPool apply(RateSpec rateSpec) {
+ this.rateSpec = rateSpec;
+ this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
+ this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
+ this.burstRatio = rateSpec.getBurstRatio();
+
+ this.burstPoolSize = maxOverActivePool - maxActivePool;
+ this.nanosPerOp = rateSpec.getNanosPerOp();
+ this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, activityDef) : filler.apply(rateSpec);
+ notifyAll();
+ return this;
+ }
+
+
+ @Override
+ public double getBurstRatio() {
+ return burstRatio;
+ }
+
+ /**
+ * Take tokens up to amt tokens form the pool and report
+ * the amount of token removed.
+ *
+ * @param amt tokens requested
+ * @return actual number of tokens removed, greater to or equal to zero
+ */
+ @Override
+ public synchronized long takeUpTo(long amt) {
+ long take = Math.min(amt, activePool);
+ activePool -= take;
+ return take;
+ }
+
+ /**
+ * wait for the given number of tokens to be available, and then remove
+ * them from the pool.
+ *
+ * @return the total number of tokens untaken, including wait tokens
+ */
+ @Override
+ public synchronized long blockAndTake() {
+ while (activePool < nanosPerOp) {
+ blocks++;
+ //System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
+ try {
+ wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
+ } catch (InterruptedException ignored) {
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ //System.out.println("waited for " + amt + "/" + activePool + " tokens");
+ }
+ //System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
+
+ activePool -= nanosPerOp;
+ return waitingPool + activePool;
+ }
+
+ @Override
+ public synchronized long blockAndTake(long tokens) {
+ while (activePool < tokens) {
+ //System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
+ try {
+ wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
+ } catch (InterruptedException ignored) {
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ //System.out.println("waited for " + amt + "/" + activePool + " tokens");
+ }
+ //System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
+
+ activePool -= tokens;
+ return waitingPool + activePool;
+ }
+
+ @Override
+ public long getWaitTime() {
+ return activePool + waitingPool;
+ }
+
+ @Override
+ public long getWaitPool() {
+ return waitingPool;
+ }
+
+ @Override
+ public long getActivePool() {
+ return activePool;
+ }
+
+ /**
+ * Add the given number of new tokens to the pool, forcing any amount
+ * that would spill over the current pool size into the wait token pool, but
+ * moving up to the configured burst tokens back from the wait token pool
+ * otherwise.
+ *
+ * The amount of backfilling that occurs is controlled by the backfill ratio,
+ * based on the number of tokens submitted. This causes normalizes the
+ * backfilling rate to the fill rate, so that it is not sensitive to refill
+ * scheduling.
+ *
+ * @param newTokens The number of new tokens to add to the token pools
+ * @return the total number of tokens in all pools
+ */
+ public synchronized long refill(long newTokens) {
+ boolean debugthis = false;
+// long debugAt = System.nanoTime();
+// if (debugAt>debugTrigger+debugRate) {
+// debugTrigger=debugAt;
+// debugthis=true;
+// }
+
+ long needed = Math.max(maxActivePool - activePool, 0L);
+ long allocatedToActivePool = Math.min(newTokens, needed);
+ activePool += allocatedToActivePool;
+
+
+ // overflow logic
+ long allocatedToOverflowPool = newTokens - allocatedToActivePool;
+ waitingPool += allocatedToOverflowPool;
+
+ // backfill logic
+ double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
+ long burstFillAllowed = (long) (refillFactor * burstPoolSize);
+
+ burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
+ long burstFill = Math.min(burstFillAllowed, waitingPool);
+
+ waitingPool -= burstFill;
+ activePool += burstFill;
+
+ if (debugthis) {
+ System.out.print(this);
+ System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
+ if (allocatedToOverflowPool > 0) {
+ System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
+ }
+ if (burstFill > 0) {
+ System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
+ }
+ System.out.println();
+ }
+ //System.out.println(this);
+ notifyAll();
+
+ return activePool + waitingPool;
+ }
+
+ @Override
+ public String toString() {
+ return "Tokens: active=" + activePool + "/" + maxActivePool
+ + String.format(
+ " (%3.1f%%)A (%3.1f%%)B ",
+ (((double) activePool / (double) maxActivePool) * 100.0),
+ (((double) activePool / (double) maxOverActivePool) * 100.0)) + " waiting=" + waitingPool +
+ " blocks=" + blocks +
+ " rateSpec:" + ((rateSpec != null) ? rateSpec.toString() : "NULL");
+ }
+
+ @Override
+ public RateSpec getRateSpec() {
+ return rateSpec;
+ }
+
+ @Override
+ public synchronized long restart() {
+ long wait = activePool + waitingPool;
+ activePool = 0L;
+ waitingPool = 0L;
+ filler.restart();
+ return wait;
+ }
+
+ @Override
+ public void start() {
+ filler.start();
+ }
+}
diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenFiller.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenFiller.java
index 359996db3..695da698b 100644
--- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenFiller.java
+++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenFiller.java
@@ -35,7 +35,7 @@ public class TokenFiller implements Runnable {
// (false);
private final long interval = (long) 1E6;
- private final TokenPool tokenPool;
+ private final ThreadDrivenTokenPool tokenPool;
private volatile boolean running = true;
private RateSpec rateSpec;
private Thread thread;
@@ -43,23 +43,21 @@ public class TokenFiller implements Runnable {
private final Timer timer;
/**
- * A token filler adds tokens to a {@link TokenPool} at some rate.
+ * A token filler adds tokens to a {@link ThreadDrivenTokenPool} at some rate.
* By default, this rate is at least every millisecond +- scheduling jitter
* in the JVM.
*
* @param rateSpec A {@link RateSpec}
- * @param def An {@link ActivityDef}
+ * @param def An {@link ActivityDef}
*/
- public TokenFiller(RateSpec rateSpec, ActivityDef def) {
+ public TokenFiller(RateSpec rateSpec, ThreadDrivenTokenPool tokenPool, ActivityDef def) {
this.rateSpec = rateSpec;
- this.tokenPool= new TokenPool(rateSpec);
- this.tokenPool.refill(rateSpec.getNanosPerOp());
+ this.tokenPool = tokenPool;
this.timer = ActivityMetrics.timer(def, "tokenfiller");
}
public TokenFiller apply(RateSpec rateSpec) {
this.rateSpec = rateSpec;
- this.tokenPool.apply(rateSpec);
return this;
}
@@ -99,6 +97,8 @@ public class TokenFiller implements Runnable {
}
public TokenFiller start() {
+ this.tokenPool.refill(rateSpec.getNanosPerOp());
+
thread = new Thread(this);
thread.setName(this.toString());
thread.setPriority(Thread.MAX_PRIORITY);
diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPool.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPool.java
index 1a37ab2ae..a2f02ac5d 100644
--- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPool.java
+++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPool.java
@@ -1,252 +1,25 @@
-/*
- *
- * Copyright 2016 jshook
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * /
- */
-
package io.nosqlbench.engine.api.activityapi.ratelimits;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
+public interface TokenPool {
+ TokenPool apply(RateSpec rateSpec);
-import static io.nosqlbench.engine.api.util.Colors.*;
+ double getBurstRatio();
-/**
- *
Synopsis
- *
- * This TokenPool represents a finite quantity which can be
- * replenished with regular refills. Extra tokens that do not fit
- * within the active token pool are saved in a waiting token pool and
- * used to backfill when allowed according to the backfill rate.
- *
- * A detailed explanation for how this works will be included
- * at @link "http://docs.nosqlbench.io/" under dev notes.
- *
- *
This is the basis for the token-based rate limiters in
- * NB. This mechanism is easily adaptable to bursting
- * capability as well as a degree of stricter timing at speed.
- * Various methods for doing this in a lock free way were
- * investigated, but the intrinsic locks provided by synchronized
- * method won out for now. This may be revisited when EB is
- * retrofitted for J11.
- *
- */
-public class TokenPool {
+ long takeUpTo(long amt);
- private final static Logger logger = LogManager.getLogger(TokenPool.class);
+ long blockAndTake();
- public static final double MIN_CONCURRENT_OPS = 2;
+ long blockAndTake(long tokens);
- private long maxActivePool;
- private long burstPoolSize;
- private long maxOverActivePool;
- private double burstRatio;
- // TODO Consider removing volatile after investigating
- private volatile long activePool;
- private volatile long waitingPool;
- private RateSpec rateSpec;
- private long nanosPerOp;
-// private long debugTrigger=0L;
-// private long debugRate=1000000000;
- private long blocks = 0L;
+ long getWaitTime();
+ long getWaitPool();
- /**
- * This constructor tries to pick reasonable defaults for the token pool for
- * a given rate spec. The active pool must be large enough to contain one
- * op worth of time, and the burst ratio
- *
- * @param rateSpec a {@link RateSpec}
- */
- public TokenPool(RateSpec rateSpec) {
- apply(rateSpec);
- logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
- }
+ long getActivePool();
- public TokenPool(long poolsize, double burstRatio) {
- this.maxActivePool = poolsize;
- this.burstRatio = burstRatio;
- this.maxOverActivePool = (long) (maxActivePool * burstRatio);
- this.burstPoolSize = maxOverActivePool - maxActivePool;
- }
+ RateSpec getRateSpec();
- /**
- * Change the settings of this token pool, and wake any blocked callers
- * just in case it allows them to proceed.
- *
- * @param rateSpec The rate specifier.
- */
- public synchronized void apply(RateSpec rateSpec) {
- this.rateSpec=rateSpec;
- this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
- this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
- this.burstRatio = rateSpec.getBurstRatio();
+ long restart();
- this.burstPoolSize = maxOverActivePool - maxActivePool;
- this.nanosPerOp = rateSpec.getNanosPerOp();
- notifyAll();
- }
-
-
- public double getBurstRatio() {
- return burstRatio;
- }
-
- /**
- * Take tokens up to amt tokens form the pool and report
- * the amount of token removed.
- *
- * @param amt tokens requested
- * @return actual number of tokens removed, greater to or equal to zero
- */
- public synchronized long takeUpTo(long amt) {
- long take = Math.min(amt, activePool);
- activePool -= take;
- return take;
- }
-
- /**
- * wait for the given number of tokens to be available, and then remove
- * them from the pool.
- *
- * @return the total number of tokens untaken, including wait tokens
- */
- public synchronized long blockAndTake() {
- while (activePool < nanosPerOp) {
- blocks++;
- //System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
- try {
- wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
- } catch (InterruptedException ignored) {
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- //System.out.println("waited for " + amt + "/" + activePool + " tokens");
- }
- //System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
-
- activePool -= nanosPerOp;
- return waitingPool + activePool;
- }
-
- public synchronized long blockAndTake(long tokens) {
- while (activePool < tokens) {
- //System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
- try {
- wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
- } catch (InterruptedException ignored) {
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- //System.out.println("waited for " + amt + "/" + activePool + " tokens");
- }
- //System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
-
- activePool -= tokens;
- return waitingPool + activePool;
- }
-
- public long getWaitTime() {
- return activePool + waitingPool;
- }
-
- public long getWaitPool() {
- return waitingPool;
- }
-
- public long getActivePool() {
- return activePool;
- }
-
- /**
- * Add the given number of new tokens to the pool, forcing any amount
- * that would spill over the current pool size into the wait token pool, but
- * moving up to the configured burst tokens back from the wait token pool
- * otherwise.
- *
- * The amount of backfilling that occurs is controlled by the backfill ratio,
- * based on the number of tokens submitted. This causes normalizes the
- * backfilling rate to the fill rate, so that it is not sensitive to refill
- * scheduling.
- *
- * @param newTokens The number of new tokens to add to the token pools
- * @return the total number of tokens in all pools
- */
- public synchronized long refill(long newTokens) {
- boolean debugthis=false;
-// long debugAt = System.nanoTime();
-// if (debugAt>debugTrigger+debugRate) {
-// debugTrigger=debugAt;
-// debugthis=true;
-// }
-
- long needed = Math.max(maxActivePool - activePool, 0L);
- long allocatedToActivePool = Math.min(newTokens, needed);
- activePool += allocatedToActivePool;
-
-
- // overflow logic
- long allocatedToOverflowPool = newTokens - allocatedToActivePool;
- waitingPool += allocatedToOverflowPool;
-
- // backfill logic
- double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
- long burstFillAllowed =(long) (refillFactor* burstPoolSize);
-
- burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
- long burstFill = Math.min(burstFillAllowed, waitingPool);
-
- waitingPool -= burstFill;
- activePool += burstFill;
-
- if (debugthis) {
- System.out.print(this);
- System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
- if (allocatedToOverflowPool>0) {
- System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
- }
- if (burstFill>0) {
- System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
- }
- System.out.println();
- }
- //System.out.println(this);
- notifyAll();
-
- return activePool+waitingPool;
- }
-
- @Override
- public String toString() {
- return "Tokens: active=" + activePool +"/" + maxActivePool
- + String.format(
- " (%3.1f%%)A (%3.1f%%)B ",
- (((double)activePool/(double)maxActivePool)*100.0),
- (((double)activePool/(double)maxOverActivePool)*100.0)) + " waiting=" + waitingPool +
- " blocks=" + blocks +
- " rateSpec:"+ ((rateSpec!=null) ? rateSpec.toString() : "NULL");
- }
-
- public RateSpec getRateSpec() {
- return rateSpec;
- }
-
- public synchronized long restart() {
- long wait=activePool+waitingPool;
- activePool=0L;
- waitingPool=0L;
- return wait;
-
- }
+ void start();
}
diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
index 884a7f569..09e00fbf0 100644
--- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
+++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -52,9 +53,23 @@ public class SimpleActivity implements Activity, ProgressCapable {
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
+ private int nameEnumerator = 0;
public SimpleActivity(ActivityDef activityDef) {
this.activityDef = activityDef;
+ if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
+ Optional workloadOpt = activityDef.getParams().getOptionalString(
+ "workload",
+ "yaml"
+ );
+ if (workloadOpt.isPresent()) {
+ activityDef.getParams().set("alias", workloadOpt.get());
+ } else {
+ activityDef.getParams().set("alias",
+ activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ + String.valueOf(nameEnumerator++));
+ }
+ }
}
public SimpleActivity(String activityDefString) {
diff --git a/engine-api/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPoolTest.java b/engine-api/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPoolTest.java
index 09def25dc..86dc50afc 100644
--- a/engine-api/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPoolTest.java
+++ b/engine-api/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/TokenPoolTest.java
@@ -17,30 +17,33 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
-import org.junit.Ignore;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TokenPoolTest {
+ ActivityDef def = new ActivityDef(ParameterMap.parseOrException("alias=testing"));
+
@Test
public void testBackfillFullRate() {
- TokenPool p = new TokenPool(100, 1.1);
- assertThat(p.refill(100L)).isEqualTo(100L);
+ ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(10000000, 1.1), def);
+ assertThat(p.refill(1000000L)).isEqualTo(1000000L);
assertThat(p.getWaitPool()).isEqualTo(0L);
- assertThat(p.refill(100L)).isEqualTo(200);
+ assertThat(p.refill(100L)).isEqualTo(1000100);
assertThat(p.getWaitPool()).isEqualTo(90L);
- assertThat(p.refill(10L)).isEqualTo(210L);
- assertThat(p.getWaitPool()).isEqualTo(100L);
+ assertThat(p.refill(10L)).isEqualTo(1000110L);
+ assertThat(p.getWaitPool()).isEqualTo(99L);
- assertThat(p.refill(10)).isEqualTo(220L);
+ assertThat(p.refill(10)).isEqualTo(1000120L);
assertThat(p.takeUpTo(100)).isEqualTo(100L);
}
@Test
public void testTakeRanges() {
- TokenPool p = new TokenPool(100, 10);
+ ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(100, 10), def);
p.refill(100);
assertThat(p.takeUpTo(99)).isEqualTo(99L);
assertThat(p.takeUpTo(10)).isEqualTo(1L);
@@ -51,7 +54,7 @@ public class TokenPoolTest {
public void testChangedParameters() {
RateSpec s1 = new RateSpec(1000L, 1.10D);
- TokenPool p = new TokenPool(s1);
+ ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(s1, def);
long r = p.refill(10000000);
assertThat(r).isEqualTo(10000000L);
assertThat(p.getWaitTime()).isEqualTo(10000000L);
diff --git a/engine-cli/pom.xml b/engine-cli/pom.xml
index d21d57308..6bcc5e654 100644
--- a/engine-cli/pom.xml
+++ b/engine-cli/pom.xml
@@ -4,7 +4,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbenchengine-core
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOTio.nosqlbenchengine-docker
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java
index 44cfec7ff..44930bc46 100644
--- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java
+++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java
@@ -21,7 +21,7 @@ import io.nosqlbench.engine.core.script.ScenariosExecutor;
import io.nosqlbench.engine.core.script.ScriptParams;
import io.nosqlbench.engine.docker.DockerMetricsManager;
import io.nosqlbench.nb.api.annotations.Annotation;
-import io.nosqlbench.nb.api.Layer;
+import io.nosqlbench.nb.api.annotations.Layer;
import io.nosqlbench.nb.api.content.Content;
import io.nosqlbench.nb.api.content.NBIO;
import io.nosqlbench.nb.api.errors.BasicError;
@@ -89,8 +89,10 @@ public class NBCLI {
loggerConfig.setConsoleLevel(NBLogLevel.ERROR);
NBCLIOptions globalOptions = new NBCLIOptions(args, NBCLIOptions.Mode.ParseGlobalsOnly);
+ String sessionName = new SessionNamer().format(globalOptions.getSessionName());
loggerConfig
+ .setSessionName(sessionName)
.setConsoleLevel(globalOptions.getConsoleLogLevel())
.setConsolePattern(globalOptions.getConsoleLoggingPattern())
.setLogfileLevel(globalOptions.getScenarioLogLevel())
@@ -103,6 +105,7 @@ public class NBCLI {
logger = LogManager.getLogger("NBCLI");
loggerConfig.purgeOldFiles(LogManager.getLogger("SCENARIO"));
logger.info("Configured scenario log at " + loggerConfig.getLogfileLocation());
+ logger.debug("Scenario log started");
// Global only processing
if (args.length == 0) {
@@ -131,7 +134,8 @@ public class NBCLI {
logger.info("Docker metrics is enabled. Docker must be installed for this to work");
DockerMetricsManager dmh = new DockerMetricsManager();
Map dashboardOptions = Map.of(
- DockerMetricsManager.GRAFANA_TAG, globalOptions.getDockerGrafanaTag()
+ DockerMetricsManager.GRAFANA_TAG, globalOptions.getDockerGrafanaTag(),
+ DockerMetricsManager.PROM_TAG, globalOptions.getDockerPromTag()
);
dmh.startMetrics(dashboardOptions);
String warn = "Docker Containers are started, for grafana and prometheus, hit" +
@@ -144,8 +148,11 @@ public class NBCLI {
if (metricsAddr != null) {
reportGraphiteTo = metricsAddr + ":9109";
- annotatorsConfig = "[{type:'log'},{type:'grafana',baseurl:'http://" + metricsAddr + ":3000/'," +
+ annotatorsConfig = "[{type:'log',level:'info'},{type:'grafana',baseurl:'http://" + metricsAddr + ":3000" +
+ "/'," +
"tags:'appname:nosqlbench',timeoutms:5000,onerror:'warn'}]";
+ } else {
+ annotatorsConfig = "[{type:'log',level:'info'}]";
}
if (args.length > 0 && args[0].toLowerCase().equals("virtdata")) {
@@ -162,7 +169,6 @@ public class NBCLI {
}
NBCLIOptions options = new NBCLIOptions(args);
- String sessionName = new SessionNamer().format(options.getSessionName());
logger = LogManager.getLogger("NBCLI");
NBIO.addGlobalIncludes(options.wantsIncludes());
@@ -271,6 +277,7 @@ public class NBCLI {
System.exit(0);
}
+ logger.debug("initializing annotators with config:'" + annotatorsConfig + "'");
Annotators.init(annotatorsConfig);
Annotators.recordAnnotation(
Annotation.newBuilder()
@@ -330,7 +337,8 @@ public class NBCLI {
options.getProgressSpec(),
options.wantsGraaljsCompatMode(),
options.wantsStackTraces(),
- options.wantsCompileScript()
+ options.wantsCompileScript(),
+ String.join("\n", args)
);
ScriptBuffer buffer = new BasicScriptBuffer()
.add(options.getCommands().toArray(new Cmd[0]));
@@ -378,7 +386,7 @@ public class NBCLI {
scenariosResults.reportToLog();
ShutdownManager.shutdown();
- logger.info(scenariosResults.getExecutionSummary());
+// logger.info(scenariosResults.getExecutionSummary());
if (scenariosResults.hasError()) {
Exception exception = scenariosResults.getOne().getException().get();
diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java
index c642f4cd2..6dda818f0 100644
--- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java
+++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java
@@ -4,10 +4,8 @@ import io.nosqlbench.engine.api.metrics.IndicatorMode;
import io.nosqlbench.engine.api.util.Unit;
import io.nosqlbench.engine.core.script.Scenario;
import io.nosqlbench.nb.api.Environment;
-import io.nosqlbench.nb.api.logging.NBLogLevel;
import io.nosqlbench.nb.api.errors.BasicError;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
+import io.nosqlbench.nb.api.logging.NBLogLevel;
import java.io.File;
import java.io.IOException;
@@ -37,7 +35,6 @@ public class NBCLIOptions {
private static final String METRICS_PREFIX = "--metrics-prefix";
- // private static final String ANNOTATE_TO_GRAFANA = "--grafana-baseurl";
private static final String ANNOTATE_EVENTS = "--annotate";
private static final String ANNOTATORS_CONFIG = "--annotators";
private static final String DEFAULT_ANNOTATORS = "all";
@@ -88,11 +85,12 @@ public class NBCLIOptions {
private final static String ENABLE_CHART = "--enable-chart";
private final static String DOCKER_METRICS = "--docker-metrics";
private final static String DOCKER_METRICS_AT = "--docker-metrics-at";
+ private static final String DOCKER_GRAFANA_TAG = "--docker-grafana-tag";
+ private static final String DOCKER_PROM_TAG = "--docker-prom-tag";
private static final String GRAALJS_ENGINE = "--graaljs";
private static final String NASHORN_ENGINE = "--nashorn";
private static final String GRAALJS_COMPAT = "--graaljs-compat";
- private static final String DOCKER_GRAFANA_TAG = "--docker-grafana-tag";
private static final String DEFAULT_CONSOLE_LOGGING_PATTERN = "%7r %-5level [%t] %-12logger{0} %msg%n%throwable";
@@ -136,8 +134,9 @@ public class NBCLIOptions {
private final List wantsToIncludePaths = new ArrayList<>();
private Scenario.Engine engine = Scenario.Engine.Graalvm;
private boolean graaljs_compat = false;
- private int hdr_digits = 4;
- private String docker_grafana_tag = "7.2.2";
+ private int hdr_digits = 3;
+ private String docker_grafana_tag = "7.3.4";
+ private String docker_prom_tag = "latest";
private boolean showStackTraces = false;
private boolean compileScript = false;
private String scriptFile = null;
@@ -146,8 +145,8 @@ public class NBCLIOptions {
private String annotatorsConfig = "";
private String statedirs = NB_STATEDIR_PATHS;
private Path statepath;
- private List statePathAccesses = new ArrayList<>();
- private String hdrForChartFileName = DEFAULT_CHART_HDR_LOG_NAME;
+ private final List statePathAccesses = new ArrayList<>();
+ private final String hdrForChartFileName = DEFAULT_CHART_HDR_LOG_NAME;
public String getAnnotatorsConfig() {
return annotatorsConfig;
@@ -285,6 +284,10 @@ public class NBCLIOptions {
arglist.removeFirst();
workspacesDirectory = readWordOrThrow(arglist, "a workspaces directory");
break;
+ case DOCKER_PROM_TAG:
+ arglist.removeFirst();
+ docker_prom_tag = readWordOrThrow(arglist, "prometheus docker tag");
+ break;
case DOCKER_GRAFANA_TAG:
arglist.removeFirst();
docker_grafana_tag = readWordOrThrow(arglist, "grafana docker tag");
@@ -766,6 +769,10 @@ public class NBCLIOptions {
return docker_grafana_tag;
}
+ public String getDockerPromTag() {
+ return docker_prom_tag;
+ }
+
public static class LoggerConfigData {
public String file;
public String pattern = ".*";
diff --git a/engine-clients/pom.xml b/engine-clients/pom.xml
index 05532cf79..6a5061cd8 100644
--- a/engine-clients/pom.xml
+++ b/engine-clients/pom.xml
@@ -5,7 +5,7 @@
mvn-defaultsio.nosqlbench
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT../mvn-defaults
@@ -21,7 +21,7 @@
io.nosqlbenchengine-api
- 3.12.160-SNAPSHOT
+ 4.15.5-SNAPSHOT
diff --git a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/By.java b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/By.java
index 110b251de..ab5e056b3 100644
--- a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/By.java
+++ b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/By.java
@@ -1,6 +1,7 @@
package io.nosqlbench.engine.clients.grafana;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class By {
@@ -57,8 +58,11 @@ public class By {
return new By("type", "alert");
}
- public static By tags(String tag) {
- return new By("tags", tag);
+ /**
+ * Add one tag at a time, in either "tag" or "tag:value" form.
+ */
+ public static By tag(String tag) {
+ return new By("tag", tag);
}
public static By id(int id) {
@@ -69,11 +73,12 @@ public class By {
List tags = new ArrayList<>();
StringBuilder sb = new StringBuilder();
for (By by : bys) {
- if (by.key.equals("tags")) {
- tags.add(by.value.toString());
+ if (by.key.equals("tag")) {
+ tags.addAll(Arrays.asList(by.value.toString().split(",")));
+ } else {
+ sb.append(by.key).append("=").append(by.value);
+ sb.append("&");
}
- sb.append(by.key).append("=").append(by.value);
- sb.append("&");
}
for (String tag : tags) {
sb.append("tags=").append(tag).append("&");
diff --git a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaClient.java b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaClient.java
index 597977f81..419b5f0f1 100644
--- a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaClient.java
+++ b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaClient.java
@@ -2,13 +2,19 @@ package io.nosqlbench.engine.clients.grafana;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import io.nosqlbench.engine.clients.grafana.transfer.GrafanaAnnotation;
import io.nosqlbench.engine.clients.grafana.transfer.Annotations;
import io.nosqlbench.engine.clients.grafana.transfer.ApiTokenRequest;
+import io.nosqlbench.engine.clients.grafana.transfer.DashboardResponse;
+import io.nosqlbench.engine.clients.grafana.transfer.GrafanaAnnotation;
+import java.io.File;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.function.Supplier;
/**
* @see Grafana Annotations API Docs
@@ -179,6 +185,27 @@ public class GrafanaClient {
return savedGrafanaAnnotation;
}
+ public DashboardResponse getDashboardByUid(String uid) {
+ HttpClient client = config.newClient();
+ HttpRequest.Builder rqb = config.newRequest("api/dashboards/uid/" + uid);
+ rqb = rqb.GET();
+
+ HttpResponse response = null;
+ try {
+ response = client.send(rqb.build(), HttpResponse.BodyHandlers.ofString());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new RuntimeException("Getting dashboard by uid (" + uid + ") failed with status code " + response.statusCode() +
+ " at baseuri " + config.getBaseUri() + ": " + response.body());
+ }
+ String body = response.body();
+
+ DashboardResponse dashboardResponse = gson.fromJson(body, DashboardResponse.class);
+ return dashboardResponse;
+ }
+
/**
*
{@code
* POST /api/annotations/graphite
@@ -302,13 +329,52 @@ public class GrafanaClient {
throw new RuntimeException("unimplemented");
}
+ /**
+ * This can be called to create an api token and store it for later use as long as you
+ * have the admin credentials for basic auth. This is preferred to continuing to
+ * passing basic auth for admin privileges. The permissions can now be narrowed or managed
+ * in a modular way.
+ *
+ * @param namer the principal name for the privelege
+ * @param role the Grafana role
+ * @param ttl Length of validity for the granted api token
+ * @param keyfilePath The path of the token. If it is present it will simply be used.
+ * @param un The basic auth username for the Admin role
+ * @param pw The basic auth password for the Admin role
+ */
+ public void cacheApiToken(Supplier namer, String role, long ttl, Path keyfilePath, String un, String pw) {
+ if (!Files.exists(keyfilePath)) {
+ GrafanaClientConfig basicClientConfig = config.copy();
+ basicClientConfig = basicClientConfig.basicAuth(un, pw);
+ GrafanaClient apiClient = new GrafanaClient(basicClientConfig);
+ String keyName = namer.get();
+ ApiToken apiToken = apiClient.createApiToken(keyName, role, ttl);
+ try {
+ if (keyfilePath.toString().contains(File.separator)) {
+ Files.createDirectories(keyfilePath.getParent(),
+ PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwx---")));
+ }
+ Files.writeString(keyfilePath, apiToken.getKey());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ GrafanaMetricsAnnotator.AuthWrapper authHeaderSupplier = new GrafanaMetricsAnnotator.AuthWrapper(
+ "Authorization",
+ new GrafanaKeyFileReader(keyfilePath),
+ s -> "Bearer " + s
+ );
+ config.addHeaderSource(authHeaderSupplier);
+ }
+
public ApiToken createApiToken(String name, String role, long ttl) {
ApiTokenRequest r = new ApiTokenRequest(name, role, ttl);
- ApiToken token = postToGrafana(r, ApiToken.class, "gen api token");
+ ApiToken token = postApiTokenRequest(r, ApiToken.class, "gen api token");
return token;
}
- private T postToGrafana(Object request, Class extends T> clazz, String desc) {
+ private T postApiTokenRequest(Object request, Class extends T> clazz, String desc) {
HttpRequest rq = config.newJsonPOST("api/auth/keys", request);
HttpClient client = config.newClient();
@@ -331,4 +397,5 @@ public class GrafanaClient {
T result = gson.fromJson(body, clazz);
return result;
}
+
}
diff --git a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaMetricsAnnotator.java b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaMetricsAnnotator.java
index 25d50aac6..c93bb1e48 100644
--- a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaMetricsAnnotator.java
+++ b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/GrafanaMetricsAnnotator.java
@@ -11,7 +11,6 @@ import io.nosqlbench.nb.api.config.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -19,11 +18,11 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
-@Service(value = Annotator.class, selector = "grafana" )
+@Service(value = Annotator.class, selector = "grafana")
public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
- private final static Logger logger = LogManager.getLogger("ANNOTATORS" );
- private final static Logger annotationsLog = LogManager.getLogger("ANNOTATIONS" );
+ private final static Logger logger = LogManager.getLogger("ANNOTATORS");
+ //private final static Logger annotationsLog = LogManager.getLogger("ANNOTATIONS" );
private OnError onError = OnError.Warn;
private GrafanaClient client;
@@ -45,9 +44,15 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
});
ga.getTags().add("layer:" + annotation.getLayer().toString());
+ if (annotation.getStart() == annotation.getEnd()) {
+ ga.getTags().add("span:instant");
+ } else {
+ ga.getTags().add("span:interval");
+ }
+
Map labels = annotation.getLabels();
- Optional.ofNullable(labels.get("alertId" ))
+ Optional.ofNullable(labels.get("alertId"))
.map(Integer::parseInt).ifPresent(ga::setAlertId);
ga.setText(annotation.toString());
@@ -56,33 +61,32 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
// Target
- Optional.ofNullable(labels.get("type" ))
+ Optional.ofNullable(labels.get("type"))
.ifPresent(ga::setType);
- Optional.ofNullable(labels.get("id" )).map(Integer::valueOf)
+ Optional.ofNullable(labels.get("id")).map(Integer::valueOf)
.ifPresent(ga::setId);
- Optional.ofNullable(labels.get("alertId" )).map(Integer::valueOf)
+ Optional.ofNullable(labels.get("alertId")).map(Integer::valueOf)
.ifPresent(ga::setAlertId);
- Optional.ofNullable(labels.get("dashboardId" )).map(Integer::valueOf)
+ Optional.ofNullable(labels.get("dashboardId")).map(Integer::valueOf)
.ifPresent(ga::setDashboardId);
- Optional.ofNullable(labels.get("panelId" )).map(Integer::valueOf)
+ Optional.ofNullable(labels.get("panelId")).map(Integer::valueOf)
.ifPresent(ga::setPanelId);
- Optional.ofNullable(labels.get("userId" )).map(Integer::valueOf)
+ Optional.ofNullable(labels.get("userId")).map(Integer::valueOf)
.ifPresent(ga::setUserId);
- Optional.ofNullable(labels.get("userName" ))
+ Optional.ofNullable(labels.get("userName"))
.ifPresent(ga::setUserName);
- Optional.ofNullable(labels.get("metric" ))
+ Optional.ofNullable(labels.get("metric"))
.ifPresent(ga::setMetric);
// Details
- annotationsLog.info("ANNOTATION:" + ga.toString());
GrafanaAnnotation created = this.client.createAnnotation(ga);
} catch (Exception e) {
@@ -110,58 +114,65 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
GrafanaClientConfig gc = new GrafanaClientConfig();
gc.setBaseUri(cfg.param("baseurl", String.class));
- if (cfg.containsKey("tags" )) {
+ if (cfg.containsKey("tags")) {
this.tags = ParamsParser.parse(cfg.param("tags", String.class), false);
}
- if (cfg.containsKey("username" )) {
- if (cfg.containsKey("password" )) {
+ if (cfg.containsKey("username")) {
+ if (cfg.containsKey("password")) {
gc.basicAuth(
cfg.param("username", String.class),
cfg.param("password", String.class)
);
} else {
- gc.basicAuth(cfg.param("username", String.class), "" );
+ gc.basicAuth(cfg.param("username", String.class), "");
}
}
Path keyfilePath = null;
- if (cfg.containsKey("apikeyfile" )) {
+ if (cfg.containsKey("apikeyfile")) {
String apikeyfile = cfg.paramEnv("apikeyfile", String.class);
keyfilePath = Path.of(apikeyfile);
- } else if (cfg.containsKey("apikey" )) {
+ } else if (cfg.containsKey("apikey")) {
gc.addHeaderSource(() -> Map.of("Authorization", "Bearer " + cfg.param("apikey", String.class)));
} else {
- Optional apikeyLocation = Environment.INSTANCE.interpolate("$NBSTATEDIR/grafana_apikey" );
+ Optional apikeyLocation = Environment.INSTANCE
+ .interpolate(cfg.paramEnv("apikeyfile", String.class));
keyfilePath = apikeyLocation.map(Path::of).orElseThrow();
}
- if (!Files.exists(keyfilePath)) {
- logger.info("Auto-configuring grafana apikey." );
- GrafanaClientConfig apiClientConf = gc.copy().basicAuth("admin", "admin" );
- GrafanaClient apiClient = new GrafanaClient(apiClientConf);
- try {
- String nodeId = SystemId.getNodeId();
- String keyName = "nosqlbench-" + nodeId + "-" + System.currentTimeMillis();
- ApiToken apiToken = apiClient.createApiToken(keyName, "Admin", Long.MAX_VALUE);
- Files.writeString(keyfilePath, apiToken.getKey());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+// if (!Files.exists(keyfilePath)) {
+// logger.info("Auto-configuring grafana apikey.");
+// GrafanaClientConfig apiClientConf = gc.copy().basicAuth("admin", "admin");
+// GrafanaClient apiClient = new GrafanaClient(apiClientConf);
+// try {
+// String nodeId = SystemId.getNodeId();
+//
+// String keyName = "nosqlbench-" + nodeId + "-" + System.currentTimeMillis();
+// ApiToken apiToken = apiClient.createApiToken(keyName, "Admin", Long.MAX_VALUE);
+// Files.createDirectories(keyfilePath.getParent(),
+// PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwx---")));
+// Files.writeString(keyfilePath, apiToken.getKey());
+// } catch (Exception e) {
+// throw new RuntimeException(e);
+// }
+// }
+//
+// AuthWrapper authHeaderSupplier = new AuthWrapper(
+// "Authorization",
+// new GrafanaKeyFileReader(keyfilePath),
+// s -> "Bearer " + s
+// );
+// gc.addHeaderSource(authHeaderSupplier);
- AuthWrapper authHeaderSupplier = new AuthWrapper(
- "Authorization",
- new GrafanaKeyFileReader(keyfilePath),
- s -> "Bearer " + s
- );
- gc.addHeaderSource(authHeaderSupplier);
-
- this.onError = OnError.valueOfName(cfg.get("onerror" ).toString());
+ this.onError = OnError.valueOfName(cfg.get("onerror").toString());
this.client = new GrafanaClient(gc);
+ String keyName = "nosqlbench-" + SystemId.getNodeId() + "-" + System.currentTimeMillis();
+ Supplier namer = () -> "nosqlbench-" + SystemId.getNodeId() + "-" + System.currentTimeMillis();
+ this.client.cacheApiToken(namer, "Admin", Long.MAX_VALUE, keyfilePath, "admin", "admin");
}
@@ -169,22 +180,22 @@ public class GrafanaMetricsAnnotator implements Annotator, ConfigAware {
public ConfigModel getConfigModel() {
return new MutableConfigModel(this)
.required("baseurl", String.class,
- "The base url of the grafana node, like http://localhost:3000/" )
- .defaultto("apikeyfile", "$NBSTATEDIR/grafana_apikey",
- "The file that contains the api key, supersedes apikey" )
+ "The base url of the grafana node, like http://localhost:3000/")
+ .defaultto("apikeyfile", "$NBSTATEDIR/grafana/grafana_apikey",
+ "The file that contains the api key, supersedes apikey")
.optional("apikey", String.class,
- "The api key to use, supersedes basic username and password" )
+ "The api key to use, supersedes basic username and password")
.optional("username", String.class,
- "The username to use for basic auth" )
+ "The username to use for basic auth")
.optional("password", String.class,
- "The password to use for basic auth" )
+ "The password to use for basic auth")
.defaultto("tags", "source:nosqlbench",
- "The tags that identify the annotations, in k:v,... form" )
+ "The tags that identify the annotations, in k:v,... form")
// .defaultto("onerror", OnError.Warn)
.defaultto("onerror", "warn",
- "What to do when an error occurs while posting an annotation" )
+ "What to do when an error occurs while posting an annotation")
.defaultto("timeoutms", 5000,
- "connect and transport timeout for the HTTP client" )
+ "connect and transport timeout for the HTTP client")
.asReadOnly();
}
diff --git a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/analyzer/GrafanaRegionAnalyzer.java b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/analyzer/GrafanaRegionAnalyzer.java
new file mode 100644
index 000000000..70a061c0c
--- /dev/null
+++ b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/analyzer/GrafanaRegionAnalyzer.java
@@ -0,0 +1,42 @@
+package io.nosqlbench.engine.clients.grafana.analyzer;
+
+import io.nosqlbench.engine.clients.grafana.By;
+import io.nosqlbench.engine.clients.grafana.GrafanaClient;
+import io.nosqlbench.engine.clients.grafana.GrafanaClientConfig;
+import io.nosqlbench.engine.clients.grafana.transfer.Annotations;
+import io.nosqlbench.engine.clients.grafana.transfer.DashboardResponse;
+import io.nosqlbench.engine.clients.grafana.transfer.GrafanaAnnotation;
+import io.nosqlbench.nb.api.SystemId;
+
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class GrafanaRegionAnalyzer {
+
+ public static void main(String[] args) {
+
+ GrafanaClientConfig ccfg = new GrafanaClientConfig();
+ ccfg.setBaseUri("http://18.191.247.162:3000/");
+ GrafanaClient gclient = new GrafanaClient(ccfg);
+ Supplier namer = () -> "nosqlbench-" + SystemId.getNodeId() + "-" + System.currentTimeMillis();
+ gclient.cacheApiToken(namer, "Admin", Long.MAX_VALUE, Path.of("grafana_apikey"), "admin", "admin");
+
+
+ DashboardResponse dashboardResponse = gclient.getDashboardByUid("aIIX1f6Wz");
+
+
+ Annotations annotations = gclient.findAnnotations(By.tag("appname:nosqlbench,layer:Activity"));
+ List mainActivities = annotations.stream()
+ .filter(s -> s.getTagMap().getOrDefault("alias", "").contains("main"))
+ .sorted(Comparator.comparing(t -> t.getTime()))
+ .collect(Collectors.toList());
+ System.out.println("end");
+ GrafanaAnnotation last = mainActivities.get(mainActivities.size() - 1);
+ long start = last.getTime();
+ long end = last.getTimeEnd();
+
+ }
+}
diff --git a/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/transfer/Dashboard.java b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/transfer/Dashboard.java
new file mode 100644
index 000000000..10c9bfe25
--- /dev/null
+++ b/engine-clients/src/main/java/io/nosqlbench/engine/clients/grafana/transfer/Dashboard.java
@@ -0,0 +1,28 @@
+package io.nosqlbench.engine.clients.grafana.transfer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Dashboard {
+ Map annotations = new HashMap<>();
+ String description;
+ boolean editable;
+ long graphToolTip;
+ long id;
+ long iteration;
+ List