Merge pull request #1497 from nosqlbench/jshook/update-paging-fix

Jshook/update paging fix
This commit is contained in:
Jonathan Shook
2023-08-30 13:20:47 -05:00
committed by GitHub
37 changed files with 474 additions and 222 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,13 +19,23 @@ package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
private final BoundStatement stmt;
public Cqld4CqlReboundStatement(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int lwtRetryCount, BoundStatement rebound, RSProcessors processors) {
super(session,maxPages,retryReplace,maxLwtRetries,lwtRetryCount, processors);
public Cqld4CqlReboundStatement(
CqlSession session,
int maxPages,
boolean retryReplace,
int maxLwtRetries,
int lwtRetryCount,
BoundStatement rebound,
RSProcessors processors,
CqlOpMetrics metrics
) {
super(session,maxPages,retryReplace,maxLwtRetries,lwtRetryCount, processors, metrics);
this.stmt = rebound;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import java.util.Map;
@@ -29,7 +29,7 @@ public class Cqld4PrintProcessor implements ResultSetProcessor {
}
@Override
public void start(long cycle, ResultSet container) {
public void start(long cycle, AsyncResultSet container) {
sb.setLength(0);
sb.append("c[").append(cycle).append("] ");
}

View File

@@ -290,6 +290,8 @@ public class Cqld4Space implements AutoCloseable {
.add(Param.optional("whitelist", String.class, "list of whitelist hosts addresses"))
.add(Param.optional("showstmt", Boolean.class, "show the contents of the statement in the log"))
.add(Param.optional("cloud_proxy_address", String.class, "Cloud Proxy Address"))
.add(Param.optional("maxpages", Integer.class, "Maximum number of pages allowed per CQL request"))
.add(Param.optional("maxretryreplace", Integer.class, "Maximum number of retry replaces with LWT for a CQL request"))
.add(SSLKsFactory.get().getConfigModel())
.add(getDriverOptionsModel())
.add(new OptionHelpers(new OptionsMap()).getConfigModel())

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.adapters.api.activityimpl.uniform.ResultProcessor;
@@ -25,7 +26,7 @@ import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RSProcessors implements ResultProcessor<ResultSet,Row>, Supplier<List<ResultSetProcessor>> {
public class RSProcessors implements ResultProcessor<AsyncResultSet,Row>, Supplier<List<ResultSetProcessor>> {
private final List<Supplier<ResultSetProcessor>> suppliers = new ArrayList<>();
@@ -47,7 +48,7 @@ public class RSProcessors implements ResultProcessor<ResultSet,Row>, Supplier<Li
}
@Override
public void start(long cycle, ResultSet container) {
public void start(long cycle, AsyncResultSet container) {
for (ResultSetProcessor processor : get()) {
processor.start(cycle, container);
}

View File

@@ -16,9 +16,9 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.adapters.api.activityimpl.uniform.ResultProcessor;
public interface ResultSetProcessor extends ResultProcessor<ResultSet, Row> {
public interface ResultSetProcessor extends ResultProcessor<AsyncResultSet, Row> {
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.adapter.cqld4.exceptions;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
@@ -27,16 +28,16 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
*/
public class ChangeUnappliedCycleException extends CqlGenericCycleException {
private final ResultSet resultSet;
private final AsyncResultSet resultSet;
private final String queryString;
public ChangeUnappliedCycleException(ResultSet resultSet, String queryString) {
public ChangeUnappliedCycleException(AsyncResultSet resultSet, String queryString) {
super("Operation was not applied:" + queryString);
this.resultSet = resultSet;
this.queryString = queryString;
}
public ResultSet getResultSet() {
public AsyncResultSet getResultSet() {
return resultSet;
}
public String getQueryString() { return queryString; }

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.adapter.cqld4.exceptions;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
@@ -27,18 +28,18 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
*/
public class ExceededRetryReplaceException extends CqlGenericCycleException {
private final ResultSet resultSet;
private final AsyncResultSet resultSet;
private final String queryString;
private final int retries;
public ExceededRetryReplaceException(ResultSet resultSet, String queryString, int retries) {
public ExceededRetryReplaceException(AsyncResultSet resultSet, String queryString, int retries) {
super("After " + retries + " retries using the retryreplace option, Operation was not applied:" + queryString);
this.retries = retries;
this.resultSet = resultSet;
this.queryString = queryString;
}
public ResultSet getResultSet() {
public AsyncResultSet getResultSet() {
return resultSet;
}
public String getQueryString() { return queryString; }

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package io.nosqlbench.adapter.cqld4.exceptions;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
/**
* <p>This is not a core exception. It was added to the CQL activity type
@@ -35,14 +35,14 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
*/
public class UnexpectedPagingException extends RuntimeException {
private final ResultSet resultSet;
private final AsyncResultSet resultSet;
private final String queryString;
private final int fetchSize;
private final int fetchedPages;
private final int maxpages;
public UnexpectedPagingException(
ResultSet resultSet,
AsyncResultSet resultSet,
String queryString,
int fetchedPages,
int maxpages,
@@ -54,16 +54,15 @@ public class UnexpectedPagingException extends RuntimeException {
this.fetchSize = fetchSize;
}
public ResultSet getResultSet() {
public AsyncResultSet getResultSet() {
return resultSet;
}
public String getMessage() {
StringBuilder sb = new StringBuilder();
sb.append("Additional paging would be required to read the results from this query fully" +
", but the user has not explicitly indicated that paging was expected.")
.append(" fetched/allowed: ").append(fetchedPages).append("/").append(maxpages)
.append(" fetchSize(").append(fetchSize).append("): ").append(queryString);
return sb.toString();
String sb = "Additional paging would be required to read the results from this query fully" +
", but the user has not explicitly indicated that paging was expected." +
" fetched/allowed: " + fetchedPages + "/" + maxpages +
" fetchSize(" + fetchSize + "): " + queryString;
return sb;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,14 +14,13 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4;
package io.nosqlbench.adapter.cqld4.instruments;
public class Cqld4OpMetrics {
public void onStart() {
public interface CqlOpMetrics {
}
void recordFetchedPages(int fetchedPages);
public void onSuccess() {
void recordFetchedRows(int fetchedRows);
}
void recordFetchedBytes(int fetchedBytes);
}

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.codahale.metrics.Histogram;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
@@ -23,12 +24,13 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.Cqld4Space;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -37,15 +39,17 @@ import java.time.Duration;
import java.util.Map;
import java.util.function.LongFunction;
public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, Cqld4Space> {
public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, Cqld4Space> implements CqlOpMetrics {
private final static Logger logger = LogManager.getLogger("CQLD4");
private final int maxpages;
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
private final LongFunction<CqlSession> sessionFunc;
private final boolean isRetryReplace;
private final int maxLwtRetries;
private final Histogram rowsHistogram;
private final Histogram pagesHistogram;
private final Histogram payloadBytesHistogram;
public Cqld4BaseOpDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op) {
super(adapter, op);
@@ -53,6 +57,9 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
this.maxpages = op.getStaticConfigOr("maxpages", 1);
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1);
this.rowsHistogram = ActivityMetrics.histogram(op, "rows", op.getStaticConfigOr("hdr_digits", 3));
this.pagesHistogram = ActivityMetrics.histogram(op, "pages", op.getStaticConfigOr("hdr_digits", 3));
this.payloadBytesHistogram = ActivityMetrics.histogram(op, "payload_bytes", op.getStaticConfigOr("hdr_digits", 3));
}
public int getMaxPages() {
@@ -76,7 +83,7 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
* All implementations of a CQL Statement Dispenser should be using the method
* provided by this function. This ensures that {@link Statement}-level attributes
* are handled uniformly and in one place.
*
* <p>
* This takes the base statement function and decorates it optionally with each
* additional qualified modifier, short-circuiting those which are not specified.
* This allows default behavior to take precedence as well as avoids unnecessary calling
@@ -136,5 +143,18 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
return sb.toString();
}
@Override
public void recordFetchedRows(int rows) {
rowsHistogram.update(rows);
}
@Override
public void recordFetchedPages(int pages) {
pagesHistogram.update(pages);
}
@Override
public void recordFetchedBytes(int bytes) {
payloadBytesHistogram.update(bytes);
}
}

View File

@@ -91,7 +91,8 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
getMaxPages(),
isRetryReplace(),
getMaxLwtRetries(),
processors
processors,
this
);
} catch (Exception exception) {
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(

View File

@@ -50,7 +50,8 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
(SimpleStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace(),
getMaxLwtRetries()
getMaxLwtRetries(),
this
);
}

View File

@@ -47,7 +47,8 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
(SimpleStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace(),
getMaxLwtRetries()
getMaxLwtRetries(),
this
);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,13 +19,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
private final BatchStatement stmt;
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxPage, int maxLwtRetries, boolean retryReplace) {
super(session,maxPage,retryReplace,maxLwtRetries,new RSProcessors());
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxPage, int maxLwtRetries, boolean retryReplace, CqlOpMetrics metrics) {
super(session,maxPage,retryReplace,maxLwtRetries,new RSProcessors(), metrics);
this.stmt = stmt;
}

View File

@@ -17,21 +17,28 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.*;
import io.nosqlbench.adapter.cqld4.Cqld4CqlReboundStatement;
import io.nosqlbench.adapter.cqld4.LWTRebinder;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException;
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
// TODO: add statement filtering
@@ -45,87 +52,93 @@ import java.util.Map;
public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture, OpGenerator, OpResultSize {
private final static Logger logger = LogManager.getLogger(Cqld4CqlOp.class);
private final CqlSession session;
private final int maxPages;
private final boolean retryReplace;
private final int maxLwtRetries;
private int retryReplaceCount =0;
private ResultSet rs;
private Cqld4CqlOp nextOp;
private final RSProcessors processors;
private final CqlOpMetrics metrics;
private int retryReplaceCount = 0;
private Cqld4CqlOp nextOp;
private int fetchedPages = 0;
private int fetchedRows = 0;
private int fetchedBytes = 0;
private int lwtRetries = 0;
private final ThreadLocal<List<Row>> results = new ThreadLocal<>();
public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
public Cqld4CqlOp(
CqlSession session,
int maxPages,
boolean retryReplace,
int maxLwtRetries,
RSProcessors processors,
CqlOpMetrics metrics
) {
this.session = session;
this.maxPages = maxPages;
this.retryReplace = retryReplace;
this.maxLwtRetries =maxLwtRetries;
this.maxLwtRetries = maxLwtRetries;
this.processors = processors;
this.metrics = metrics;
}
protected Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) {
protected Cqld4CqlOp(
CqlSession session,
int maxPages,
boolean retryReplace,
int maxLwtRetries,
int retryReplaceCount,
RSProcessors processors,
CqlOpMetrics metrics
) {
this.session = session;
this.maxPages = maxPages;
this.retryReplace = retryReplace;
this.maxLwtRetries =maxLwtRetries;
this.retryReplaceCount=retryRplaceCount;
this.maxLwtRetries = maxLwtRetries;
this.retryReplaceCount = retryReplaceCount;
this.processors = processors;
this.metrics = metrics;
}
public final List<Row> apply(long cycle) {
Statement<?> stmt = getStmt();
rs = session.execute(stmt);
processors.start(cycle, rs);
int totalRows = 0;
Statement<?> statement = getStmt();
logger.trace(() -> "apply() invoked, statement obtained, executing async with page size: " + statement.getPageSize() + " thread local rows: ");
CompletionStage<AsyncResultSet> statementStage = session.executeAsync(statement);
if (!rs.wasApplied()) {
if (!retryReplace) {
throw new ChangeUnappliedCycleException(rs, getQueryString());
} else {
retryReplaceCount++;
if (retryReplaceCount >maxLwtRetries) {
throw new ExceededRetryReplaceException(rs,getQueryString(), retryReplaceCount);
}
Row one = rs.one();
processors.buffer(one);
totalRows++;
nextOp = this.rebindLwt(stmt, one);
CompletionStage<List<Row>> rowsStage = statementStage.thenCompose((rs) -> {
processors.start(cycle, rs);
ArrayList<Row> completeRowSet = new ArrayList<>();
if (!rs.wasApplied()) {
handleRebindLWT(rs, statement);
}
return collect(rs, completeRowSet, cycle);
}).exceptionally(throwable -> {
if (throwable instanceof RuntimeException tre) throw tre;
throw new RuntimeException(throwable);
});
try {
return rowsStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
} catch (Exception e) {
if (e instanceof RuntimeException re) throw re;
throw new RuntimeException(e);
} finally {
processors.flush();
metrics.recordFetchedPages(fetchedPages);
metrics.recordFetchedRows(fetchedRows);
metrics.recordFetchedBytes(fetchedBytes);
}
// Paginated Op
Iterator<Row> reader = rs.iterator();
int pages = 0;
// TODO/MVEL: An optimization to this would be to collect the results in a result set processor,
// but allow/require this processor to be added to an op _only_ in the event that it would
// be needed by a downstream consumer like the MVEL expected result evaluator
var resultRows = new ArrayList<Row>();
while (true) {
int pageRows = rs.getAvailableWithoutFetching();
for (int i = 0; i < pageRows; i++) {
Row row = reader.next();
resultRows.add(row);
processors.buffer(row);
}
if (pages++ > maxPages) {
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxPages, stmt.getPageSize());
}
if (rs.isFullyFetched()) {
results.set(resultRows);
break;
}
totalRows += pageRows;
}
processors.flush();
return results.get();
// logger.trace(() -> "\n\n--- Rows collected for cycle: " + cycle + " count: "
// + rs.size() + " dt: " + System.nanoTime());
//
// results.set(completeRowSet);
// processors.flush();
}
// private BiFunction<AsyncResultSet,Throwable> handler
@Override
public Op getNextOp() {
Op next = nextOp;
@@ -133,12 +146,8 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
return next;
}
@Override
public Map<String, ?> capture() {
if (rs == null) {
throw new UndefinedResultSetException(this);
}
return null;
throw new NotImplementedException("Not implemented for Cqld4CqlOp");
}
public abstract Statement<?> getStmt();
@@ -147,7 +156,44 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors);
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors, metrics);
}
private CompletionStage<List<Row>> collect(AsyncResultSet resultSet, ArrayList<Row> rowList, final long cycle) {
fetchedBytes+=resultSet.getExecutionInfo().getResponseSizeInBytes();
if (++fetchedPages > maxPages) {
throw new UnexpectedPagingException(resultSet, getQueryString(), fetchedPages, maxPages, getStmt().getPageSize());
}
int remaining = resultSet.remaining();
fetchedRows += remaining;
rowList.ensureCapacity(rowList.size() + remaining);
for (Row row : resultSet.currentPage()) {
rowList.add(row);
processors.buffer(row);
}
if (resultSet.hasMorePages()) {
return resultSet.fetchNextPage().thenCompose(rs -> collect(rs, rowList, cycle));
} else {
processors.start(cycle, resultSet);
return CompletableFuture.completedStage(rowList);
}
}
private void handleRebindLWT(AsyncResultSet resultSet, Statement<?> statement) {
if (++lwtRetries < maxLwtRetries) {
throw new ExceededRetryReplaceException(resultSet, getQueryString(), lwtRetries);
}
if (!retryReplace) {
throw new ChangeUnappliedCycleException(resultSet, getQueryString());
} else {
retryReplaceCount++;
if (retryReplaceCount > maxLwtRetries) {
throw new ExceededRetryReplaceException(resultSet, getQueryString(), retryReplaceCount);
}
Row one = resultSet.one();
processors.buffer(one);
nextOp = this.rebindLwt(statement, one);
}
}
}

View File

@@ -19,13 +19,15 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
private final BoundStatement stmt;
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
super(session, maxPages, retryReplace, maxLwtRetries, processors);
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages,
boolean retryReplace, int maxLwtRetries, RSProcessors processors, CqlOpMetrics metrics) {
super(session, maxPages, retryReplace, maxLwtRetries, processors, metrics);
this.stmt = stmt;
}
@@ -37,4 +39,5 @@ public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
public String getQueryString() {
return stmt.getPreparedStatement().getQuery();
}
}

View File

@@ -19,12 +19,13 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.instruments.CqlOpMetrics;
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
private final SimpleStatement stmt;
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries) {
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors());
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, CqlOpMetrics metrics) {
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors(), metrics);
this.stmt = stmt;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package io.nosqlbench.adapter.cqld4.processors;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
import io.nosqlbench.virtdata.core.templates.CapturePoint;
@@ -32,7 +32,7 @@ public class CqlFieldCaptureProcessor implements ResultSetProcessor {
}
@Override
public void start(long cycle, ResultSet container) {
public void start(long cycle, AsyncResultSet container) {
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package io.nosqlbench.adapter.cqld4.processors;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
@@ -28,12 +28,11 @@ import java.util.ArrayList;
public class RSIterableCapture implements ResultSetProcessor {
private long cycle;
private ArrayList<Row> rows = new ArrayList<>();
private final ArrayList<Row> rows = new ArrayList<>();
@Override
public void start(long cycle, ResultSet container) {
public void start(long cycle, AsyncResultSet container) {
this.cycle = cycle;
rows = new ArrayList<Row>(container.getAvailableWithoutFetching());
}
@Override

View File

@@ -1,5 +1,5 @@
description: |
A cql-starter workload.
A cql-starter workload.
* Cassandra: 3.x, 4.x.
* DataStax Enterprise: 6.8.x.
* DataStax Astra.
@@ -41,9 +41,9 @@ blocks:
AND durable_writes = true;
create-table: |
create table if not exists <<keyspace:starter>>.<<table:cqlstarter>> (
machine_id UUID,
message text,
time timestamp,
machine_id UUID,
message text,
time timestamp,
PRIMARY KEY ((machine_id), time)
) WITH CLUSTERING ORDER BY (time DESC);
@@ -53,9 +53,9 @@ blocks:
ops:
create-table-astra: |
create table if not exists <<keyspace:starter>>.<<table:cqlstarter>> (
machine_id UUID,
machine_id UUID,
message text,
time timestamp,
time timestamp,
PRIMARY KEY ((machine_id), time)
) WITH CLUSTERING ORDER BY (time DESC);
@@ -65,7 +65,7 @@ blocks:
idempotent: true
ops:
insert-rampup: |
insert into <<keyspace:starter>>.<<table:cqlstarter>> (machine_id, message, time)
insert into <<keyspace:starter>>.<<table:cqlstarter>> (machine_id, message, time)
values ({machine_id}, {rampup_message}, {time}) using timestamp {ts};
rampdown:
@@ -89,4 +89,4 @@ blocks:
ops:
insert-main: |
insert into <<keyspace:starter>>.<<table:cqlstarter>>
(machine_id, message, time) values ({machine_id}, {message}, {time}) using timestamp {ts};
(machine_id, message, time) values ({machine_id}, {message}, {time}) using timestamp {ts};

View File

@@ -180,7 +180,7 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
final int hdrDigits = pop.getStaticConfigOr("hdr_digits", 4).intValue();
successTimer = ActivityMetrics.timer(pop, "success", hdrDigits);
errorTimer = ActivityMetrics.timer(pop, "error", hdrDigits);
resultSizeHistogram = ActivityMetrics.histogram(pop, "resultset-size", hdrDigits);
resultSizeHistogram = ActivityMetrics.histogram(pop, "resultset_size", hdrDigits);
}
}

View File

@@ -39,7 +39,7 @@ public class ExceptionHistoMetrics {
public ExceptionHistoMetrics(final NBLabeledElement parentLabels, final ActivityDef activityDef) {
this.parentLabels = parentLabels;
this.activityDef = activityDef;
this.allerrors = ActivityMetrics.histogram(parentLabels, "errorhistos.ALL", activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4));
this.allerrors = ActivityMetrics.histogram(parentLabels, "errorhistos_ALL", activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4));
}
public void update(final String name, final long magnitude) {
@@ -47,7 +47,7 @@ public class ExceptionHistoMetrics {
if (null == h) synchronized (this.histos) {
h = this.histos.computeIfAbsent(
name,
k -> ActivityMetrics.histogram(this.parentLabels, "errorhistos." + name, this.activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
errName -> ActivityMetrics.histogram(this.parentLabels, "errorhistos_"+errName, this.activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
);
}
h.update(magnitude);

View File

@@ -207,7 +207,7 @@ public class NBCLIScenarioParser {
alias = alias.replaceAll("STEP", sanitize(stepName));
alias = (alias.startsWith("alias=") ? alias : "alias=" + alias);
buildingCmd.put("alias", alias);
buildingCmd.put("labels","labels=workload:"+sanitize(workloadToken));
buildingCmd.put("labels","labels=workload:$"+sanitize(workloadToken));
logger.debug(() -> "rebuilt command: " + String.join(" ", buildingCmd.values()));
buildCmdBuffer.addAll(buildingCmd.values());
@@ -221,11 +221,12 @@ public class NBCLIScenarioParser {
public static String sanitize(String word) {
String sanitized = word;
sanitized = sanitized.replaceAll("\\..+$", "");
String shortened = sanitized;
sanitized = sanitized.replaceAll("-","_");
sanitized = sanitized.replaceAll("[^a-zA-Z0-9_]+", "");
if (!word.equals(sanitized)) {
logger.warn("The identifier or value '" + word + "' was sanitized to '" + sanitized + "' to be compatible with monitoring systems. You should probably change this to make diagnostics easier.");
if (!shortened.equals(sanitized)) {
logger.warn("The identifier or value '" + shortened + "' was sanitized to '" + sanitized + "' to be compatible with monitoring systems. You should probably change this to make diagnostics easier.");
}
return sanitized;
}

View File

@@ -160,11 +160,8 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
this.sessionCode = SystemId.genSessionCode(sessionTime);
this.sessionName = SessionNamer.format(globalOptions.getSessionName(),sessionTime).replaceAll("SESSIONCODE",sessionCode);
this.labels = NBLabels.forKV("command", commandName, "appname", "nosqlbench")
this.labels = NBLabels.forKV("appname", "nosqlbench")
.andInstances("node",SystemId.getNodeId())
.andInstances("nodeid",SystemId.getPackedNodeId())
// .andInstances("sesscode",sessionCode)
.andInstances("session",sessionName)
.and(globalOptions.getLabelMap());
NBCLI.loggerConfig

View File

@@ -107,7 +107,7 @@ public class NBCLIScenarioParserTest {
"cycles", "20",
"cycles-test", "20",
"driver", "stdout",
"labels","workload:scenario_test",
"labels","workload:$scenario_test",
"workload", "scenario-test"
));
}
@@ -121,7 +121,7 @@ public class NBCLIScenarioParserTest {
"alias", "schema",
"cycles-test", "20",
"driver", "stdout",
"labels","workload:scenario_test",
"labels","workload:$scenario_test",
"tags", "block:\"schema.*\"",
"workload", "scenario-test"
));
@@ -172,7 +172,7 @@ public class NBCLIScenarioParserTest {
"alias", "schema",
"cycles-test", "20",
"driver", "stdout",
"labels","workload:scenario_test",
"labels","workload:$scenario_test",
"tags", "block:\"schema.*\"",
"workload", "scenario-test"
));

View File

@@ -30,6 +30,7 @@ import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.engine.core.lifecycle.scenario.Scenario;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,7 +46,8 @@ import java.util.stream.Collectors;
* <p>In order to allow for dynamic thread management, which is not easily supported as an explicit feature
* of most executor services, threads are started as long-running processes and managed via state signaling.
* The {@link RunState} enum, {@link MotorState} type, and {@link RunStateTally}
* state tracking class are used together to represent valid states and transitions, contain and transition state atomically,
* state tracking class are used together to represent valid states and transitions, contain and transition state
* atomically,
* and provide blocking conditions for observers, respectively.</p>
*
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
@@ -70,6 +72,8 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
private long startedAt = 0L;
private long stoppedAt = 0L;
private ActivityExecutorShutdownHook shutdownHook = null;
public ActivityExecutor(Activity activity, String sessionId) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
@@ -87,7 +91,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
/**
* Simply stop the motors
*/
public void stopActivity() {
public void stopActivity() {
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping);
@@ -125,7 +129,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.element(this)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.detail("params", getActivityDef().toString())
@@ -389,6 +393,17 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
@Override
public ExecutionResult call() throws Exception {
shutdownHook=new ActivityExecutorShutdownHook(this);
Runtime.getRuntime().addShutdownHook(shutdownHook);
long startAt = System.currentTimeMillis();
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.now()
.layer(Layer.Activity)
.detail("event", "start-activity")
.detail("params", activityDef.toString())
.build());
try {
// instantiate and configure fixtures that need to be present
@@ -402,11 +417,13 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
} catch (Exception e) {
this.exception = e;
} finally {
stoppedAt=System.currentTimeMillis();
activity.shutdownActivity();
activity.closeAutoCloseables();
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
finish();
return result;
}
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
return result;
}
/**
@@ -530,4 +547,23 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
public NBLabels getLabels() {
return activity.getLabels();
}
public synchronized void finish() {
if (shutdownHook!=null) {
logger.warn("Activity was interrupted by process exit, shutting down");
}
shutdownHook=null;
stoppedAt = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.interval(startedAt, stoppedAt)
.layer(Layer.Activity)
.detail("event", "stop-activity")
.detail("params", activityDef.toString())
.build());
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.activity;
public class ActivityExecutorShutdownHook extends Thread {
private final ActivityExecutor activityExecutor;
public ActivityExecutorShutdownHook(ActivityExecutor activityExecutor) {
this.activityExecutor = activityExecutor;
}
@Override
public void run() {
activityExecutor.finish();
}
}

View File

@@ -351,8 +351,7 @@ public class Scenario implements Callable<ExecutionMetricsResult>, NBLabeledElem
.element(this)
.interval(startedAtMillis, this.endedAtMillis)
.layer(Layer.Scenario)
// .labels("state", state.toString())
.detail("command_line", commandLine)
.detail("event","stop-scenario")
.build();
Annotators.recordAnnotation(annotation);

View File

@@ -15,8 +15,6 @@
*/
package io.nosqlbench.engine.core.lifecycle.scenario;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
@@ -24,10 +22,12 @@ import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.engine.core.lifecycle.activity.*;
import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesExceptionHandler;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityLoader;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityRuntimeInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -75,14 +75,6 @@ public class ScenarioController implements NBLabeledElement {
private synchronized ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, this);
Annotators.recordAnnotation(Annotation.newBuilder()
.element(activity)
.now()
.layer(Layer.Activity)
.detail("params", activityDef.toString())
.build());
ActivityExecutor executor = new ActivityExecutor(activity, this.scenario.getScenarioName());
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
@@ -177,16 +169,7 @@ public class ScenarioController implements NBLabeledElement {
}
scenariologger.debug("STOP {}", activityDef.getAlias());
runtimeInfo.stopActivity();
Annotators.recordAnnotation(Annotation.newBuilder()
.element(runtimeInfo.getActivity())
.now()
.layer(Layer.Activity)
.detail("command", "stop")
.detail("params", activityDef.toString())
.build());
}
/**
@@ -237,15 +220,6 @@ public class ScenarioController implements NBLabeledElement {
if (null == runtimeInfo) {
throw new RuntimeException("could not force stop missing activity:" + activityDef);
}
Annotators.recordAnnotation(Annotation.newBuilder()
.element(runtimeInfo.getActivity())
.now()
.layer(Layer.Activity)
.detail("command", "forceStop")
.detail("params", activityDef.toString())
.build());
scenariologger.debug("FORCE STOP {}", activityDef.getAlias());
runtimeInfo.forceStopActivity();

View File

@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.DoubleSummaryStatistics;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
@@ -189,11 +190,32 @@ public class ActivityMetrics {
return registry;
}
/**
* This variant creates a named metric for all of the stats which may be needed, name with metricname_average,
* and so on. It uses the same data reservoir for all views, but only returns one of them as a handle to the metric.
* This has the effect of leaving some of the metric objects unreferencable from the caller side. This may need
* to be changed in a future update in the even that full inventory management is required on metric objects here.
* @param parent The labeled element the metric pertains to
* @param metricFamilyName The name of the measurement
* @return One of the created metrics, suitable for calling {@link DoubleSummaryGauge#accept(double)} on.
*/
public static DoubleSummaryGauge summaryGauge(NBLabeledElement parent, String metricFamilyName) {
DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
DoubleSummaryGauge anyGauge = null;
for (DoubleSummaryGauge.Stat statName: DoubleSummaryGauge.Stat.values()){
final NBLabels labels = parent.getLabels()
.andTypes("name",sanitize(metricFamilyName))
.modifyValue("name", n -> n+"_"+statName.name().toLowerCase());
anyGauge= (DoubleSummaryGauge) register(labels, () -> new DoubleSummaryGauge(labels,statName,stats));
}
return anyGauge;
}
@SuppressWarnings("unchecked")
public static <T> Gauge<T> gauge(NBLabeledElement parent, String metricFamilyName, Gauge<T> gauge) {
final NBLabels labels = parent.getLabels().andTypes("name",sanitize(metricFamilyName));
return (Gauge<T>) register(labels, () -> new NBMetricGauge(labels,gauge));
return (Gauge<T>) register(labels, () -> new NBMetricGaugeWrapper<>(labels,gauge));
}
private static MetricRegistry lookupRegistry() {

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.api.engine.metrics;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricGauge;
import java.util.DoubleSummaryStatistics;
/**
* Create a discrete stat reservoir as a gauge.
*/
public class DoubleSummaryGauge implements NBMetricGauge<Double> {
private final NBLabels labels;
private final Stat stat;
private final DoubleSummaryStatistics stats;
public enum Stat {
Min,
Max,
Average,
Count,
Sum
}
public DoubleSummaryGauge(NBLabels labels, Stat stat, DoubleSummaryStatistics stats) {
this.labels = labels;
this.stat = stat;
this.stats = stats;
}
public DoubleSummaryGauge(NBLabels labels, Stat stat) {
this.labels = labels;
this.stat = stat;
this.stats = new DoubleSummaryStatistics();
}
public synchronized void accept(double value) {
stats.accept(value);
}
@Override
public synchronized Double getValue() {
return switch(stat) {
case Min -> stats.getMin();
case Max -> stats.getMax();
case Average -> stats.getAverage();
case Count -> (double) stats.getCount();
case Sum -> stats.getSum();
};
}
@Override
public NBLabels getLabels() {
return labels;
}
}

View File

@@ -18,25 +18,7 @@ package io.nosqlbench.api.engine.metrics.instruments;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
public class NBMetricGauge<T> implements Gauge<T>, NBLabeledElement {
public interface NBMetricGauge<T> extends Gauge<T>, NBLabeledElement {
private final Gauge<? extends T> gauge;
private final NBLabels labels;
public NBMetricGauge(NBLabels labels, Gauge<? extends T> gauge) {
this.gauge = gauge;
this.labels = labels;
}
@Override
public T getValue() {
return gauge.getValue();
}
@Override
public NBLabels getLabels() {
return labels;
}
}

View File

@@ -0,0 +1,41 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.api.engine.metrics.instruments;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.config.NBLabels;
public class NBMetricGaugeWrapper<T> implements NBMetricGauge<T> {
private final Gauge<? extends T> gauge;
private final NBLabels labels;
public NBMetricGaugeWrapper(NBLabels labels, Gauge<? extends T> gauge) {
this.gauge = gauge;
this.labels = labels;
}
@Override
public T getValue() {
return gauge.getValue();
}
@Override
public NBLabels getLabels() {
return labels;
}
}

View File

@@ -80,16 +80,21 @@ public class PromExpositionFormat {
}
if (metric instanceof final Sampling sampling) {
// Use the summary form
buffer.append("# TYPE ").append(labels.valueOf("name")).append(" summary\n");
buffer.append("# TYPE ").append(labels.valueOf("name")).append(" histogram\n");
final Snapshot snapshot = sampling.getSnapshot();
for (final double quantile : new double[]{0.5, 0.75, 0.90, 0.95, 0.98, 0.99, 0.999}) {
final double value = snapshot.getValue(quantile);
buffer
.append(labels.andTypes("quantile", String.valueOf(quantile)).linearize("name"))
.append(labels.modifyValue("name",n -> n+"_bucket").andTypes("le", String.valueOf(quantile)).linearize("name"))
// .append(labels.andTypes("quantile", String.valueOf(quantile)).linearize("name"))
.append(' ')
.append(value)
.append('\n');
}
buffer.append(labels.modifyValue("name",n->n+"_bucket").andTypes("le","+Inf").linearize("name"))
.append(' ')
.append(snapshot.getMax())
.append('\n');
final double snapshotCount =snapshot.size();
buffer.append(labels.modifyValue("name",n->n+"_count").linearize("name"))
.append(' ')

View File

@@ -188,6 +188,10 @@ public class SystemId {
}
public static String genSessionCode(long epochMillis) {
return packLong(epochMillis);
}
public static String genSessionAndNodeCode(long epochMillis) {
return packLong(epochMillis) + "_" + getPackedNodeId();
}

View File

@@ -69,14 +69,15 @@ public class PromExpositionFormatTest {
assertThat(formatted).matches(Pattern.compile("""
# TYPE mynameismud_total counter
mynameismud_total\\{label3="value3"} 0 \\d+
# TYPE mynameismud summary
mynameismud\\{label3="value3",quantile="0.5"} 18463.0
mynameismud\\{label3="value3",quantile="0.75"} 27727.0
mynameismud\\{label3="value3",quantile="0.9"} 33279.0
mynameismud\\{label3="value3",quantile="0.95"} 35135.0
mynameismud\\{label3="value3",quantile="0.98"} 36223.0
mynameismud\\{label3="value3",quantile="0.99"} 36607.0
mynameismud\\{label3="value3",quantile="0.999"} 36927.0
# TYPE mynameismud histogram
mynameismud_bucket\\{label3="value3",le="0.5"} 18463.0
mynameismud_bucket\\{label3="value3",le="0.75"} 27727.0
mynameismud_bucket\\{label3="value3",le="0.9"} 33279.0
mynameismud_bucket\\{label3="value3",le="0.95"} 35135.0
mynameismud_bucket\\{label3="value3",le="0.98"} 36223.0
mynameismud_bucket\\{label3="value3",le="0.99"} 36607.0
mynameismud_bucket\\{label3="value3",le="0.999"} 36927.0
mynameismud_bucket\\{label3="value3",le="\\+Inf"} 36991
mynameismud_count\\{label3="value3"} 1000.0
# TYPE mynameismud_max gauge
mynameismud_max\\{label3="value3"} 36991
@@ -102,14 +103,15 @@ public class PromExpositionFormatTest {
assertThat(formatted).matches(Pattern.compile("""
# TYPE monsieurmarius_total counter
monsieurmarius_total\\{label4="value4"} 1000 \\d+
# TYPE monsieurmarius summary
monsieurmarius\\{label4="value4",quantile="0.5"} 18463.0
monsieurmarius\\{label4="value4",quantile="0.75"} 27727.0
monsieurmarius\\{label4="value4",quantile="0.9"} 33279.0
monsieurmarius\\{label4="value4",quantile="0.95"} 35135.0
monsieurmarius\\{label4="value4",quantile="0.98"} 36223.0
monsieurmarius\\{label4="value4",quantile="0.99"} 36607.0
monsieurmarius\\{label4="value4",quantile="0.999"} 36927.0
# TYPE monsieurmarius histogram
monsieurmarius_bucket\\{label4="value4",le="0.5"} 18463.0
monsieurmarius_bucket\\{label4="value4",le="0.75"} 27727.0
monsieurmarius_bucket\\{label4="value4",le="0.9"} 33279.0
monsieurmarius_bucket\\{label4="value4",le="0.95"} 35135.0
monsieurmarius_bucket\\{label4="value4",le="0.98"} 36223.0
monsieurmarius_bucket\\{label4="value4",le="0.99"} 36607.0
monsieurmarius_bucket\\{label4="value4",le="0.999"} 36927.0
monsieurmarius_bucket\\{label4="value4",le="\\+Inf"} 36991
monsieurmarius_count\\{label4="value4"} 1000.0
# TYPE monsieurmarius_max gauge
monsieurmarius_max\\{label4="value4"} 36991
@@ -152,7 +154,7 @@ public class PromExpositionFormatTest {
@Test
public void testGaugeFormat() {
Gauge cosetteGauge = () -> 1500;
NBMetricGauge nbMetricGauge = new NBMetricGauge(NBLabels.forKV("name","cosette","label6", "value6"), cosetteGauge);
NBMetricGauge nbMetricGauge = new NBMetricGaugeWrapper(NBLabels.forKV("name","cosette","label6", "value6"), cosetteGauge);
String formatted = PromExpositionFormat.format(nowclock, nbMetricGauge);
assertThat(formatted).matches(Pattern.compile("""
@@ -161,7 +163,7 @@ public class PromExpositionFormatTest {
"""));
Gauge cosetteGauge2 = () -> "2000.0";
NBMetricGauge nbMetricGauge2 = new NBMetricGauge(NBLabels.forKV("name","cosette2","label7", "value7"), cosetteGauge2);
NBMetricGauge nbMetricGauge2 = new NBMetricGaugeWrapper(NBLabels.forKV("name","cosette2","label7", "value7"), cosetteGauge2);
String formatted2 = PromExpositionFormat.format(nowclock, nbMetricGauge2);
assertThat(formatted2).matches(Pattern.compile("""
@@ -172,7 +174,7 @@ public class PromExpositionFormatTest {
final int number = 3000;
final CharSequence charSequence = Integer.toString(number);
Gauge cosetteGauge3 = () -> charSequence;
NBMetricGauge nbMetricGauge3 = new NBMetricGauge(NBLabels.forKV("name","cosette3","label8", "value8"), cosetteGauge3);
NBMetricGauge nbMetricGauge3 = new NBMetricGaugeWrapper(NBLabels.forKV("name","cosette3","label8", "value8"), cosetteGauge3);
String formatted3 = PromExpositionFormat.format(nowclock, nbMetricGauge3);
assertThat(formatted3).matches(Pattern.compile("""

View File

@@ -63,7 +63,7 @@ public class SystemIdTest {
@Test
public void testGenSessionCode() {
String sessionCode=SystemId.genSessionCode(234L);
assertThat(sessionCode).matches("[0-9a-zA-Z~-]+_[0-9a-zA-Z~-]+");
assertThat(sessionCode).matches("[0-9a-zA-Z~-]+");
logger.info("session code: " + sessionCode);
}