mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
working test config
This commit is contained in:
parent
d9a83ee788
commit
c619bf705a
@ -1,3 +1,4 @@
|
||||
min_version: 5.21
|
||||
description: |
|
||||
This is a template for live vector search testing.
|
||||
|
||||
@ -17,7 +18,7 @@ scenarios:
|
||||
cassandra:
|
||||
drop: run tags='block:drop' threads==undef cycles==undef
|
||||
# nb5 cql-vector2 cassandra.schema host=localhost localdc=datacenter1 dimensions=100
|
||||
schema: run tags='block:schema' threads==undef cycles==undef
|
||||
schema: run tags='op=create_.*' threads==undef cycles==undef
|
||||
# nb5 cql-vector2 cassandra.rampup host=localhost localdc=datacenter1 dimensions=100 trainsize=1000000 dataset=glove-100-angular rate=10000
|
||||
rampup: run tags='block:rampup' threads=auto cycles=TEMPLATE(trainsize,set-the-trainsize) errors=counter,warn
|
||||
# nb5 cql-vector2 cassandra.search_and_index testsize=10000 host=localhost localdc=datacenter1 dimensions=100 dataset=glove-100-angular --report-csv-to rmetrics:.*:5s
|
||||
@ -25,16 +26,14 @@ scenarios:
|
||||
run alias=search_and_index tags='block:search_and_index,optype=select' labels='target:cassandra'
|
||||
cycles=TEMPLATE(testsize) errors=counter,warn threads=1
|
||||
astra_vectors:
|
||||
drop: run tags='block:drop' labels='target:astra' threads==undef cycles==undef driverconfig=app.conf
|
||||
schema: run tags='block:schema' labels='target:astra' threads==undef cycles==undef
|
||||
rampup: run tags='block:rampup' labels='target:astra' threads=100 cycles=TEMPLATE(trainsize) errors=counter
|
||||
drop: run tags='block:drop' tags='block:drop' threads==undef cycles==undef
|
||||
schema: run tags='block:schema' tags='op=create_.*(table|index)' threads==undef cycles==undef dimensions==TEMPLATE(dimensions,25)
|
||||
train: run tags='block:rampup' threads=auto cycles=TEMPLATE(trainsize) errors=counter,warn maxtries=2 dimensions==TEMPLATE(dimensions,25)
|
||||
# search_and_index_unthrottled: >-
|
||||
# run tags='block:search_and_index,optype=select' labels='target:astra'
|
||||
# cycles=TEMPLATE(testsize) threads=10 errors=count,retry stride=500 errors=counter
|
||||
search_and_index: >-
|
||||
run alias=search_and_index tags='block:search_and_index,optype=select' labels='target:astra'
|
||||
cycles=TEMPLATE(testsize) errors=count,retry stride=100 striderate=7.50
|
||||
errors=counter threads=500
|
||||
testann: >-
|
||||
run tags='block:testann' cycles=TEMPLATE(testsize) errors=count,retry maxtries=2 threads=auto
|
||||
# one activity or two? data leap-frog? or concurrency separate for both?
|
||||
# await_index: run tags='block:await_index' # This would need to exit when a condition is met
|
||||
# stop_search_and_index: stop search_and_index
|
||||
@ -50,10 +49,10 @@ params:
|
||||
bindings:
|
||||
id: ToString()
|
||||
# This
|
||||
test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test"); ToCqlVector();
|
||||
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors")
|
||||
distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/distance")
|
||||
train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train"); ToCqlVector();
|
||||
test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test"); ToCqlVector();
|
||||
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors")
|
||||
distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/distance")
|
||||
train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"); ToCqlVector();
|
||||
synthetic_vectors: HashedFloatVectors(TEMPLATE(dimensions));
|
||||
|
||||
blocks:
|
||||
@ -75,23 +74,18 @@ blocks:
|
||||
raw: |
|
||||
CREATE KEYSPACE IF NOT EXISTS TEMPLATE(keyspace,baselines)
|
||||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
|
||||
target: cassandra
|
||||
create_table:
|
||||
raw: |
|
||||
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (
|
||||
key TEXT,
|
||||
value vector<float,TEMPLATE(dimensions)>,
|
||||
value vector<float,TEMPLATE(dimensions,set-the-dimensions-template-var)>,
|
||||
PRIMARY KEY (key)
|
||||
);
|
||||
tags:
|
||||
target: astra
|
||||
create_sai_index:
|
||||
raw: |
|
||||
CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex'
|
||||
WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)'};
|
||||
# WITH OPTIONS = {'maximum_node_connections' : TEMPLATE(M,16), 'construction_beam_width' : TEMPLATE(ef,100), 'similarity_function' : 'TEMPLATE(similarity_function,dot_product)'};
|
||||
tags:
|
||||
target: astra
|
||||
rampup:
|
||||
params:
|
||||
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
|
||||
@ -102,7 +96,7 @@ blocks:
|
||||
(key, value) VALUES ({id},{train_floatlist});
|
||||
# await_index:
|
||||
# ops:
|
||||
search_and_index:
|
||||
testann:
|
||||
ops:
|
||||
select_ann_limit_TEMPLATE(k,100):
|
||||
prepared: |
|
||||
|
@ -107,7 +107,7 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> params) {
|
||||
return OpsLoader.loadString("noop: noop", OpTemplateFormat.inline, params,null).getOps();
|
||||
return OpsLoader.loadString("noop: noop", OpTemplateFormat.inline, params,null).getOps(true);
|
||||
// return OpsLoader.loadString("log:level=INFO", OpTemplateFormat.inline, params,null).getOps();
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ public class HttpOpMapperTest {
|
||||
|
||||
private static ParsedOp parsedOpFor(final String yaml) {
|
||||
final OpsDocList docs = OpsLoader.loadString(yaml, OpTemplateFormat.yaml, Map.of(), null);
|
||||
final OpTemplate opTemplate = docs.getOps().get(0);
|
||||
final OpTemplate opTemplate = docs.getOps(true).get(0);
|
||||
final ParsedOp parsedOp = new ParsedOp(opTemplate, HttpOpMapperTest.cfg, List.of(HttpOpMapperTest.adapter.getPreprocessor()), new TestComponent("parent","parent"));
|
||||
return parsedOp;
|
||||
}
|
||||
|
@ -99,8 +99,9 @@ public class OpDef extends OpTemplate {
|
||||
|
||||
private LinkedHashMap<String, String> composeTags() {
|
||||
LinkedHashMap<String, String> tagsWithName = new LinkedHashMap<>(new MultiMapLookup<>(rawOpDef.getTags(), block.getTags()));
|
||||
tagsWithName.put("name",getName());
|
||||
tagsWithName.put("block",block.getName());
|
||||
tagsWithName.put("name",getName());
|
||||
tagsWithName.put("op",this.rawOpDef.getName());
|
||||
return tagsWithName;
|
||||
}
|
||||
|
||||
|
@ -58,8 +58,8 @@ public class OpsDocList implements Iterable<OpsDoc> {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<OpTemplate> getOps() {
|
||||
return getOps("");
|
||||
public List<OpTemplate> getOps(boolean logit) {
|
||||
return getOps("", logit);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -67,19 +67,32 @@ public class OpsDocList implements Iterable<OpsDoc> {
|
||||
* @return The list of all included op templates for all included blocks of in this document,
|
||||
* including the inherited and overridden values from this doc and the parent block.
|
||||
*/
|
||||
public List<OpTemplate> getOps(String tagFilterSpec) {
|
||||
public List<OpTemplate> getOps(String tagFilterSpec, boolean logit) {
|
||||
TagFilter ts = new TagFilter(tagFilterSpec);
|
||||
List<OpTemplate> opTemplates = new ArrayList<>();
|
||||
|
||||
getStmtDocs().stream()
|
||||
.flatMap(d -> d.getOpTemplates().stream())
|
||||
.filter(ts::matchesTagged)
|
||||
List<OpTemplate> rawtemplates = getStmtDocs().stream()
|
||||
.flatMap(d -> d.getOpTemplates().stream()).toList();
|
||||
|
||||
List<String> matchlog = new ArrayList<>();
|
||||
rawtemplates.stream()
|
||||
.map(ts::matchesTaggedResult)
|
||||
.peek(r -> matchlog.add(r.getLog()))
|
||||
.filter(TagFilter.Result::matched)
|
||||
.map(TagFilter.Result::getElement)
|
||||
.forEach(opTemplates::add);
|
||||
|
||||
if (logit) {
|
||||
for (String s : matchlog) {
|
||||
logger.info(s);
|
||||
}
|
||||
}
|
||||
|
||||
return opTemplates;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Iterator<OpsDoc> iterator() {
|
||||
return getStmtDocs().iterator();
|
||||
|
@ -381,7 +381,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "namedblock1__op1",
|
||||
"block": "namedblock1"
|
||||
"block": "namedblock1",
|
||||
"op": "op1"
|
||||
}
|
||||
},
|
||||
{
|
||||
@ -392,7 +393,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "namedblock1__op2",
|
||||
"block": "namedblock1"
|
||||
"block": "namedblock1",
|
||||
"op": "op2"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -454,7 +456,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block1__op1",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "op1"
|
||||
}
|
||||
},
|
||||
{
|
||||
@ -465,7 +468,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block1__op2",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "op2"
|
||||
}
|
||||
},
|
||||
{
|
||||
@ -475,7 +479,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "this_is_block_2__op3",
|
||||
"block": "this_is_block_2"
|
||||
"block": "this_is_block_2",
|
||||
"op": "op3"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -524,7 +529,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "myblock__stmt1",
|
||||
"block": "myblock"
|
||||
"block": "myblock",
|
||||
"op": "stmt1"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
@ -40,7 +40,8 @@ op: select * from bar.table;
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__stmt1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "stmt1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -78,7 +79,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__stmt1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "stmt1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -121,7 +123,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -163,7 +166,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -201,7 +205,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -242,7 +247,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -317,9 +323,9 @@ ops:
|
||||
"prepared": false
|
||||
},
|
||||
"tags": {
|
||||
"block": "schema",
|
||||
"block": "block0",
|
||||
"name": "block0__special-op-name",
|
||||
"block": "block0"
|
||||
"op": "special-op-name"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -416,10 +422,10 @@ blocks:
|
||||
"prepared": false
|
||||
},
|
||||
"tags": {
|
||||
"block": "schema",
|
||||
"block": "block_named_fred",
|
||||
"docleveltag": "is-tagging-everything",
|
||||
"name": "block_named_fred__special-op-name",
|
||||
"block": "block_named_fred"
|
||||
"op": "special-op-name"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
@ -40,7 +40,8 @@ ops: "cycle number '{{NumberNameToString}}'"
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__stmt1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "stmt1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "cycle number '{{NumberNameToString}}'"
|
||||
@ -83,7 +84,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__stmt1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "stmt1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "even cycle '{{NumberNameToString}}'"
|
||||
@ -93,7 +95,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__stmt2",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "stmt2"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "odd cycle '{{NumberNameToString}}'"
|
||||
@ -136,7 +139,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__myop1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "myop1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "even cycle '{{NumberNameToString}}'"
|
||||
@ -146,7 +150,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__myop2",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "myop2"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "odd cycle '{{NumberNameToString}}'"
|
||||
@ -194,7 +199,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"opfield1": "opvalue1",
|
||||
@ -245,7 +251,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": [
|
||||
@ -304,7 +311,8 @@ ops:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"index_map": {
|
||||
@ -377,7 +385,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__op1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"index_map": {
|
||||
|
@ -48,7 +48,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"block": "block0",
|
||||
"name": "block0__special-op-name"
|
||||
"name": "block0__special-op-name",
|
||||
"op": "special-op-name"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -100,7 +101,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"block": "block0",
|
||||
"name": "block0__op1"
|
||||
"name": "block0__op1",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -146,7 +148,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"block": "block0",
|
||||
"name": "block0__op1"
|
||||
"name": "block0__op1",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -193,12 +196,13 @@ ops:
|
||||
"field1": "select * from ks1.tb1;",
|
||||
"field2": "field 2 value"
|
||||
},
|
||||
params: {
|
||||
"params": {
|
||||
"paramname1": "paramvalue1"
|
||||
},
|
||||
"tags": {
|
||||
"block": "block0",
|
||||
"name": "block0__op1"
|
||||
"name": "block0__op1",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -250,7 +254,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"block": "block0",
|
||||
"name": "block0__op1"
|
||||
"name": "block0__op1",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -307,7 +312,8 @@ ops:
|
||||
},
|
||||
"tags": {
|
||||
"block": "block0",
|
||||
"name": "block0__op1"
|
||||
"name": "block0__op1",
|
||||
"op": "op1"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -346,7 +352,8 @@ ops: "my test op"
|
||||
},
|
||||
"tags": {
|
||||
"name": "block0__stmt1",
|
||||
"block": "block0"
|
||||
"block": "block0",
|
||||
"op": "stmt1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "my test op"
|
||||
@ -395,7 +402,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block1__stmt1",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "stmt1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "my test op"
|
||||
@ -451,7 +459,8 @@ blocks:
|
||||
},
|
||||
"tags": {
|
||||
"name": "block1__op1",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "my test op"
|
||||
@ -512,7 +521,8 @@ blocks:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block1__op1",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "my test op",
|
||||
@ -574,7 +584,8 @@ blocks:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block1__op1",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "op1"
|
||||
},
|
||||
"op": {
|
||||
"stmt": "my test op"
|
||||
@ -584,10 +595,12 @@ blocks:
|
||||
{
|
||||
"tags": {
|
||||
"name": "block1__params",
|
||||
"block": "block1"
|
||||
"block": "block1",
|
||||
"op": "params"
|
||||
},
|
||||
"op": {
|
||||
"pname": "pvalue"
|
||||
|
||||
},
|
||||
"name": "block1__params"
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ public class YamlSpecValidator implements STAssemblyValidator {
|
||||
List<Map<String, Object>> expectedList = gson.fromJson(json, type);
|
||||
|
||||
OpsDocList stmtsDocs = OpsLoader.loadString(yaml, OpTemplateFormat.yaml, new HashMap<>(), null);
|
||||
List<OpTemplate> stmts = stmtsDocs.getOps();
|
||||
List<OpTemplate> stmts = stmtsDocs.getOps(false);
|
||||
List<Map<String, Object>> stmt_objs = stmts.stream().map(OpTemplate::asData).collect(Collectors.toList());
|
||||
|
||||
try {
|
||||
|
@ -47,7 +47,7 @@ public class OpDetailOverrideTest {
|
||||
assertThat(s.getStmt()).contains("globalstatement1");
|
||||
assertThat(s.getBindings()).hasSize(1);
|
||||
assertThat(s.getParams()).hasSize(1);
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","block0","global_tag1","tag value","name","block0__stmt1"));
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","block0","global_tag1","tag value","name","block0__stmt1", "op","stmt1"));
|
||||
|
||||
OpsBlock doc1block1 = doc1.getBlocks().get(1);
|
||||
List<OpTemplate> ops = doc1block1.getOps();
|
||||
@ -56,14 +56,14 @@ public class OpDetailOverrideTest {
|
||||
s = ops.get(0);
|
||||
assertThat(s.getName()).isEqualTo("testblock1__stmt1");
|
||||
assertThat(s.getStmt()).contains("astatement1");
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","testblock1","global_tag1","tag value","name","testblock1__stmt1"));
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","testblock1","global_tag1","tag value","name","testblock1__stmt1","op","stmt1"));
|
||||
assertThat(s.getBindings()).hasSize(1);
|
||||
assertThat(s.getParams()).hasSize(1);
|
||||
|
||||
s = ops.get(1);
|
||||
assertThat(s.getName()).isEqualTo("testblock1__s2name");
|
||||
assertThat(s.getStmt()).contains("s2statement data");
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","testblock1","global_tag1","tag value","name","testblock1__s2name"));
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","testblock1","global_tag1","tag value","name","testblock1__s2name","op","s2name"));
|
||||
assertThat(s.getBindings()).hasSize(1);
|
||||
assertThat(s.getParams()).hasSize(1);
|
||||
|
||||
@ -78,7 +78,7 @@ public class OpDetailOverrideTest {
|
||||
s = ops.get(3);
|
||||
assertThat(s.getName()).isEqualTo("testblock1__s4");
|
||||
assertThat(s.getStmt()).contains("statement 4");
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","testblock1","global_tag1","tag value","name","testblock1__s4"));
|
||||
assertThat(s.getTags()).isEqualTo(Map.of("block","testblock1","global_tag1","tag value","name","testblock1__s4","op","s4"));
|
||||
assertThat(s.getBindings()).hasSize(1);
|
||||
assertThat(s.getParams()).hasSize(1);
|
||||
|
||||
|
@ -130,9 +130,9 @@ public class OpsDocListTest {
|
||||
|
||||
@Test
|
||||
public void testFilteredStmts() {
|
||||
List<OpTemplate> stmts = doclist.getOps("");
|
||||
List<OpTemplate> stmts = doclist.getOps("",true);
|
||||
Assertions.assertThat(stmts).hasSize(6);
|
||||
stmts = doclist.getOps("root1:value23");
|
||||
stmts = doclist.getOps("root1:value23",true);
|
||||
Assertions.assertThat(stmts).hasSize(2);
|
||||
}
|
||||
|
||||
|
@ -82,8 +82,8 @@ public class ParsedOpTest {
|
||||
ps1: "param-one"
|
||||
""";
|
||||
final OpsDocList stmtsDocs = OpsLoader.loadString(opt, OpTemplateFormat.yaml, cfg.getMap(), null);
|
||||
assertThat(stmtsDocs.getOps().size()).isEqualTo(1);
|
||||
final OpTemplate opTemplate = stmtsDocs.getOps().get(0);
|
||||
assertThat(stmtsDocs.getOps(true).size()).isEqualTo(1);
|
||||
final OpTemplate opTemplate = stmtsDocs.getOps(true).get(0);
|
||||
final ParsedOp parsedOp = new ParsedOp(opTemplate, cfg, List.of(), getParent());
|
||||
|
||||
assertThat(parsedOp.getAsFunctionOr("d1", "invalid").apply(1L)).isEqualTo("one");
|
||||
|
@ -31,7 +31,6 @@ import org.apache.logging.log4j.Logger;
|
||||
public class CountErrorHandler extends CounterErrorHandler {
|
||||
|
||||
public CountErrorHandler() {
|
||||
logger.warn("Starting with v4.17 onward, use 'counter'. See cql-errors.md for usage.");
|
||||
}
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(CountErrorHandler.class);
|
||||
|
@ -528,8 +528,8 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
|
||||
|
||||
OpsDocList opsDocList = loadStmtsDocList();
|
||||
|
||||
List<OpTemplate> unfilteredOps = opsDocList.getOps();
|
||||
List<OpTemplate> filteredOps = opsDocList.getOps(tagfilter);
|
||||
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
|
||||
List<OpTemplate> filteredOps = opsDocList.getOps(tagfilter, true);
|
||||
|
||||
if (0 == filteredOps.size()) {
|
||||
// There were no ops, and it *wasn't* because they were all filtered out.
|
||||
@ -555,6 +555,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
|
||||
3) driver=stdout (or any other drive that can synthesize ops)""");
|
||||
}
|
||||
if (0 == filteredOps.size()) {
|
||||
|
||||
throw new BasicError("There were no active op templates with tag filter '" + tagfilter + '\'');
|
||||
}
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ public class CommandTemplateTest {
|
||||
OpsDocList opsDocs = OpsLoader.loadString("ops:\n" +
|
||||
" - s1: test1=foo test2=bar",
|
||||
OpTemplateFormat.yaml, Map.of(), null);
|
||||
OpTemplate optpl = opsDocs.getOps().get(0);
|
||||
OpTemplate optpl = opsDocs.getOps(true).get(0);
|
||||
CommandTemplate ct = new CommandTemplate(optpl);
|
||||
assertThat(ct.isStatic()).isTrue();
|
||||
}
|
||||
@ -53,7 +53,7 @@ public class CommandTemplateTest {
|
||||
" bar: NumberNameToString();\n",
|
||||
OpTemplateFormat.yaml, Map.of(), null
|
||||
);
|
||||
OpTemplate optpl = stmtsDocs.getOps().get(0);
|
||||
OpTemplate optpl = stmtsDocs.getOps(true).get(0);
|
||||
CommandTemplate ct = new CommandTemplate(optpl);
|
||||
String format = gson.toJson(ct);
|
||||
logger.debug(format);
|
||||
|
@ -27,6 +27,8 @@ import io.nosqlbench.api.content.NBIO;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.CsvReporter;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.MetricInstanceFilter;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.PromPushReporterComponent;
|
||||
import io.nosqlbench.api.engine.util.Unit;
|
||||
import io.nosqlbench.api.errors.BasicError;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
@ -42,14 +44,8 @@ import io.nosqlbench.engine.cli.NBCLIOptions.Mode;
|
||||
import io.nosqlbench.engine.core.annotation.Annotators;
|
||||
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
|
||||
import io.nosqlbench.engine.core.clientload.ClientSystemMetricChecker;
|
||||
import io.nosqlbench.engine.core.clientload.DiskStatsReader;
|
||||
import io.nosqlbench.engine.core.clientload.LoadAvgReader;
|
||||
import io.nosqlbench.engine.core.clientload.MemInfoReader;
|
||||
import io.nosqlbench.engine.core.clientload.NetDevReader;
|
||||
import io.nosqlbench.engine.core.clientload.StatReader;
|
||||
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
|
||||
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
|
||||
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
|
||||
import io.nosqlbench.engine.core.lifecycle.session.NBSession;
|
||||
import io.nosqlbench.engine.core.logging.LoggerConfig;
|
||||
import io.nosqlbench.engine.core.metadata.MarkdownFinder;
|
||||
@ -58,11 +54,13 @@ import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.core.config.ConfigurationFactory;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@ -210,12 +208,7 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
|
||||
}
|
||||
}
|
||||
|
||||
String reportGraphiteTo = globalOptions.wantsReportGraphiteTo();
|
||||
String annotatorsConfig = globalOptions.getAnnotatorsConfig();
|
||||
String promPushConfig = globalOptions.getPromPushConfig();
|
||||
final String reportPromPushTo = globalOptions.wantsReportPromPushTo();
|
||||
|
||||
String graphiteMetricsAddress = null;
|
||||
|
||||
if (annotatorsConfig == null || annotatorsConfig.isBlank()) {
|
||||
List<Map<String, String>> annotatorsConfigs = new ArrayList<>();
|
||||
@ -224,16 +217,6 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
|
||||
"level", "info"
|
||||
));
|
||||
|
||||
if (null != graphiteMetricsAddress) {
|
||||
reportGraphiteTo = graphiteMetricsAddress + ":9109";
|
||||
annotatorsConfigs.add(Map.of(
|
||||
"type", "grafana",
|
||||
"baseurl", "http://" + graphiteMetricsAddress + ":3000",
|
||||
"tags", "appname:nosqlbench",
|
||||
"timeoutms", "5000",
|
||||
"onerror", "warn"
|
||||
));
|
||||
}
|
||||
Gson gson = new GsonBuilder().create();
|
||||
annotatorsConfig = gson.toJson(annotatorsConfigs);
|
||||
}
|
||||
@ -413,6 +396,20 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
|
||||
new CsvReporter(session,Path.of(cfg.file), cfg.millis, filter);
|
||||
});
|
||||
|
||||
options.wantsReportPromPushTo().ifPresent(cfg -> {
|
||||
String[] words = cfg.split(",");
|
||||
String uri;
|
||||
long intervalMs=10_000L;
|
||||
|
||||
switch (words.length) {
|
||||
case 2: intervalMs= Unit.msFor(words[1]).orElseThrow(() -> new RuntimeException("can't parse '" + words[1] + "!"));
|
||||
case 1: uri = words[0];
|
||||
break;
|
||||
default: throw new RuntimeException("Unable to parse '" + cfg + "', must be in <URI> or <URI>,ms form");
|
||||
}
|
||||
session.create().pushReporter(uri,intervalMs,NBLabels.forKV());
|
||||
});
|
||||
|
||||
|
||||
|
||||
ExecutionResult sessionResult = session.apply(options.getCommands());
|
||||
|
@ -57,7 +57,6 @@ public class NBCLIOptions {
|
||||
private static final String METRICS_LABELSPEC = "--metrics-labelspec";
|
||||
private static final String LABELSPEC = "--labelspec";
|
||||
private static final String ANNOTATORS_CONFIG = "--annotators";
|
||||
private static final String PROMPUSH_CONFIG = "--prompush";
|
||||
|
||||
// Enabled if the TERM env var is provided
|
||||
private static final String ANSI = "--ansi";
|
||||
@ -180,7 +179,6 @@ public class NBCLIOptions {
|
||||
private String scriptFile;
|
||||
private String[] annotateEvents = {"ALL"};
|
||||
private String annotatorsConfig = "";
|
||||
private String promPushConfig = "";
|
||||
private String statedirs = NBStatePath.NB_STATEDIR_PATHS;
|
||||
private Path statepath;
|
||||
private final String hdrForChartFileName = NBCLIOptions.DEFAULT_CHART_HDR_LOG_NAME;
|
||||
@ -211,10 +209,6 @@ public class NBCLIOptions {
|
||||
return this.annotatorsConfig;
|
||||
}
|
||||
|
||||
public String getPromPushConfig() {
|
||||
return this.promPushConfig;
|
||||
}
|
||||
|
||||
public NBLabels getLabelMap() {
|
||||
return this.labels;
|
||||
}
|
||||
@ -387,10 +381,6 @@ public class NBCLIOptions {
|
||||
arglist.removeFirst();
|
||||
this.reportPromPushTo = arglist.removeFirst();
|
||||
break;
|
||||
case NBCLIOptions.PROMPUSH_CONFIG:
|
||||
arglist.removeFirst();
|
||||
promPushConfig = this.readWordOrThrow(arglist, "prompush config");
|
||||
break;
|
||||
case NBCLIOptions.GRAPHITE_LOG_LEVEL:
|
||||
arglist.removeFirst();
|
||||
this.graphitelogLevel = arglist.removeFirst();
|
||||
@ -757,8 +747,8 @@ public class NBCLIOptions {
|
||||
return this.reportGraphiteTo;
|
||||
}
|
||||
|
||||
public String wantsReportPromPushTo() {
|
||||
return this.reportPromPushTo;
|
||||
public Optional<String> wantsReportPromPushTo() {
|
||||
return Optional.ofNullable(this.reportPromPushTo);
|
||||
}
|
||||
|
||||
public String wantsMetricsPrefix() {
|
||||
|
@ -34,12 +34,12 @@ public class NBCLIScenarioParserTemplateVarTest {
|
||||
cmds.forEach(System.out::println);
|
||||
|
||||
OpsDocList workload1 = OpsLoader.loadPath(cmds.get(0).getArg("workload"),cmds.get(0).getParams());
|
||||
OpTemplate optpl1 = workload1.getOps().get(0);
|
||||
OpTemplate optpl1 = workload1.getOps(true).get(0);
|
||||
System.out.println("op from cmd1:"+optpl1);
|
||||
assertThat(optpl1.getStmt()).contains("cycle {cycle} replaced replaced\n");
|
||||
|
||||
OpsDocList workload2 = OpsLoader.loadPath(cmds.get(1).getArg("workload"),cmds.get(1).getParams());
|
||||
OpTemplate optpl2 = workload2.getOps().get(0);
|
||||
OpTemplate optpl2 = workload2.getOps(true).get(0);
|
||||
System.out.println("op from cmd2:"+optpl2);
|
||||
assertThat(optpl2.getStmt()).contains("cycle {cycle} def1 def1\n");
|
||||
}
|
||||
@ -51,7 +51,7 @@ public class NBCLIScenarioParserTemplateVarTest {
|
||||
cmds.forEach(System.out::println);
|
||||
|
||||
OpsDocList workload1 = OpsLoader.loadPath(cmds.get(0).getArg("workload"),cmds.get(0).getParams());
|
||||
OpTemplate optpl1 = workload1.getOps().get(0);
|
||||
OpTemplate optpl1 = workload1.getOps(true).get(0);
|
||||
System.out.println("op from cmd1:"+optpl1);
|
||||
assertThat(optpl1.getStmt()).contains("cycle {cycle} overridden overridden\n");
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ public class TagFilter {
|
||||
*
|
||||
* @return a Result telling whether the tags matched and why or why not
|
||||
*/
|
||||
protected Result matches(Map<String, String> tags) {
|
||||
protected Result<Map<String,String>> matches(Map<String, String> tags) {
|
||||
|
||||
List<String> log = new ArrayList<>();
|
||||
|
||||
@ -229,11 +229,15 @@ public class TagFilter {
|
||||
totalKeyMatches += matchedKey ? 1 : 0;
|
||||
}
|
||||
boolean matched = conjugate.matchfunc.apply(filter.size(),totalKeyMatches);
|
||||
return new Result(matched, log);
|
||||
if (filter.keySet().isEmpty()) {
|
||||
log.add("(<☑>) " + tags.toString() + " : matched empty pattern");
|
||||
}
|
||||
return new Result<>(tags, matched, log);
|
||||
}
|
||||
|
||||
public Result matchesTaggedResult(Tagged item) {
|
||||
return matches(item.getTags());
|
||||
public <T extends Tagged> Result<T> matchesTaggedResult(T item) {
|
||||
Result<Map<String, String>> matches = matches(item.getTags());
|
||||
return new Result<>(item,matches.matched(),matches.matchLog);
|
||||
}
|
||||
|
||||
public boolean matchesTagged(Tagged item) {
|
||||
@ -244,33 +248,26 @@ public class TagFilter {
|
||||
return filter;
|
||||
}
|
||||
|
||||
public static class Result {
|
||||
public static class Result<T> {
|
||||
private final boolean matched;
|
||||
private final List<String> matchLog;
|
||||
private final T element;
|
||||
|
||||
public Result(boolean matched, List<String> log) {
|
||||
public Result(T element, boolean matched, List<String> log) {
|
||||
this.element = element;
|
||||
this.matched = matched;
|
||||
this.matchLog = log;
|
||||
}
|
||||
|
||||
public static Result Matched(String reason) {
|
||||
return new Result(true, new ArrayList<String>() {{
|
||||
add(reason);
|
||||
}});
|
||||
public T getElement() {
|
||||
return element;
|
||||
}
|
||||
|
||||
public static Result Unmatched(String reason) {
|
||||
return new Result(false, new ArrayList<String>() {{
|
||||
add(reason);
|
||||
}});
|
||||
}
|
||||
|
||||
public boolean matched() {
|
||||
return this.matched;
|
||||
}
|
||||
|
||||
public String getLog() {
|
||||
return this.matchLog.stream().collect(Collectors.joining("\n"));
|
||||
return String.join("\n", this.matchLog);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,11 +18,14 @@ package io.nosqlbench.api.engine.metrics.reporters;
|
||||
|
||||
import io.nosqlbench.api.config.standard.*;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.api.system.NBEnvironment;
|
||||
import io.nosqlbench.components.NBBaseComponent;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
import io.nosqlbench.components.PeriodicTaskComponent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpClient.Redirect;
|
||||
@ -39,69 +42,28 @@ import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
|
||||
public class PromPushReporterComponent extends PeriodicTaskComponent implements NBConfigurable {
|
||||
public class PromPushReporterComponent extends PeriodicTaskComponent {
|
||||
private static final Logger logger = LogManager.getLogger(PromPushReporterComponent.class);
|
||||
private final Path keyfilePath;
|
||||
private HttpClient client;
|
||||
private final URI uri;
|
||||
private String bearerToken;
|
||||
private boolean needsAuth;
|
||||
|
||||
public PromPushReporterComponent(
|
||||
final String targetUriSpec,
|
||||
final String config,
|
||||
long millis,
|
||||
NBComponent component,
|
||||
NBLabels labels
|
||||
) {
|
||||
super(component, labels, millis, true);
|
||||
|
||||
uri = URI.create(targetUriSpec);
|
||||
needsAuth = false;
|
||||
ConfigLoader loader = new ConfigLoader();
|
||||
List<Map> configs = loader.load(config, Map.class);
|
||||
NBConfigModel cm = this.getConfigModel();
|
||||
if (configs != null) {
|
||||
logger.info("PromPushReporter process configuration: %s", config);
|
||||
for (Map cmap : configs) {
|
||||
NBConfiguration cfg = cm.apply(cmap);
|
||||
this.applyConfig(cfg);
|
||||
}
|
||||
} else {
|
||||
logger.info("PromPushReporter default configuration");
|
||||
HashMap<String, String> junk = new HashMap<>(Map.of());
|
||||
NBConfiguration cfg = cm.apply(junk);
|
||||
this.applyConfig(cfg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(this.getClass())
|
||||
.add(Param.defaultTo("apikeyfile", "$NBSTATEDIR/prompush/prompush_apikey")
|
||||
.setDescription("The file that contains the api key, supersedes apikey"))
|
||||
.add(Param.optional("apikey", String.class)
|
||||
.setDescription("The api key to use"))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
Path keyfilePath = null;
|
||||
Optional<String> optionalApikeyfile = cfg.getEnvOptional("apikeyfile");
|
||||
Optional<String> optionalApikey = cfg.getOptional("apikey");
|
||||
bearerToken = null;
|
||||
if (optionalApikeyfile.isPresent()) {
|
||||
keyfilePath = optionalApikeyfile.map(Path::of).orElseThrow();
|
||||
public PromPushReporterComponent(NBComponent parent, URI endpoint, long intervalMs, NBLabels nbLabels) {
|
||||
super(parent,nbLabels,intervalMs,true);
|
||||
this.uri = endpoint;
|
||||
this.keyfilePath = NBEnvironment.INSTANCE
|
||||
.interpolateWithTimestamp("$NBSTATEDIR/prompush/prompush_apikey", System.currentTimeMillis())
|
||||
.map(Path::of)
|
||||
.orElseThrow(() -> new RuntimeException("Unable to create path for apikey file: $NBSTATEDIR/prompush/prompush_apikey"));
|
||||
if (Files.isRegularFile(keyfilePath)) {
|
||||
logger.info("Reading Bearer Token from %s", keyfilePath);
|
||||
PromPushKeyFileReader keyfile = new PromPushKeyFileReader(keyfilePath);
|
||||
bearerToken = keyfile.get();
|
||||
try {
|
||||
logger.info("Reading Bearer Token from {}", keyfilePath);
|
||||
this.bearerToken = Files.readString(keyfilePath).trim();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else if (optionalApikey.isPresent()) {
|
||||
bearerToken = optionalApikey.get();
|
||||
}
|
||||
needsAuth = (null != bearerToken);
|
||||
bearerToken = "Bearer " + bearerToken;
|
||||
}
|
||||
|
||||
public void task() {
|
||||
@ -131,8 +93,8 @@ public class PromPushReporterComponent extends PeriodicTaskComponent implements
|
||||
remainingRetries--;
|
||||
final HttpClient client = getCachedClient();
|
||||
final HttpRequest.Builder rb = HttpRequest.newBuilder().uri(uri);
|
||||
if (needsAuth) {
|
||||
rb.setHeader("Authorization", bearerToken);
|
||||
if (bearerToken!=null) {
|
||||
rb.setHeader("Authorization", "Bearer " + bearerToken);
|
||||
}
|
||||
final HttpRequest request = rb.POST(BodyPublishers.ofString(exposition)).build();
|
||||
final BodyHandler<String> handler = HttpResponse.BodyHandlers.ofString();
|
||||
|
@ -1,241 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components;
|
||||
|
||||
import io.nosqlbench.api.config.standard.*;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.PromExpositionFormat;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.PromPushKeyFileReader;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpClient.Redirect;
|
||||
import java.net.http.HttpClient.Version;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpRequest.BodyPublishers;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.net.http.HttpResponse.BodyHandler;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class AttachedMetricsPushReporter extends NBBaseComponent implements NBConfigurable, Runnable {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(AttachedMetricsPushReporter.class);
|
||||
private final int intervalSeconds;
|
||||
private HttpClient client;
|
||||
private final URI uri;
|
||||
private String bearerToken;
|
||||
private boolean needsAuth;
|
||||
|
||||
private Lock lock = new ReentrantLock(false);
|
||||
private Condition shutdownSignal = lock.newCondition();
|
||||
|
||||
public AttachedMetricsPushReporter(
|
||||
final String targetUriSpec,
|
||||
NBComponent node,
|
||||
int seconds,
|
||||
NBLabels extraLabels
|
||||
) {
|
||||
super(node, extraLabels);
|
||||
this.intervalSeconds = seconds;
|
||||
uri = URI.create(targetUriSpec);
|
||||
needsAuth = false;
|
||||
|
||||
String config = "";
|
||||
ConfigLoader loader = new ConfigLoader();
|
||||
List<Map> configs = loader.load(config, Map.class);
|
||||
NBConfigModel cm = this.getConfigModel();
|
||||
if (configs != null) {
|
||||
logger.info("PromPushReporter process configuration: %s", config);
|
||||
for (Map cmap : configs) {
|
||||
NBConfiguration cfg = cm.apply(cmap);
|
||||
this.applyConfig(cfg);
|
||||
}
|
||||
} else {
|
||||
logger.info("PromPushReporter default configuration");
|
||||
HashMap<String, String> junk = new HashMap<>(Map.of());
|
||||
NBConfiguration cfg = cm.apply(junk);
|
||||
this.applyConfig(cfg);
|
||||
}
|
||||
|
||||
Thread.ofVirtual().start(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(this.getClass())
|
||||
.add(Param.defaultTo("apikeyfile", "$NBSTATEDIR/prompush/prompush_apikey")
|
||||
.setDescription("The file that contains the api key, supersedes apikey"))
|
||||
.add(Param.optional("apikey", String.class)
|
||||
.setDescription("The api key to use"))
|
||||
.asReadOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyConfig(NBConfiguration cfg) {
|
||||
Path keyfilePath = null;
|
||||
Optional<String> optionalApikeyfile = cfg.getEnvOptional("apikeyfile");
|
||||
Optional<String> optionalApikey = cfg.getOptional("apikey");
|
||||
bearerToken = null;
|
||||
if (optionalApikeyfile.isPresent()) {
|
||||
keyfilePath = optionalApikeyfile.map(Path::of).orElseThrow();
|
||||
if (Files.isRegularFile(keyfilePath)) {
|
||||
logger.info("Reading Bearer Token from %s", keyfilePath);
|
||||
PromPushKeyFileReader keyfile = new PromPushKeyFileReader(keyfilePath);
|
||||
bearerToken = keyfile.get();
|
||||
}
|
||||
} else if (optionalApikey.isPresent()) {
|
||||
bearerToken = optionalApikey.get();
|
||||
}
|
||||
needsAuth = (null != bearerToken);
|
||||
bearerToken = "Bearer " + bearerToken;
|
||||
}
|
||||
|
||||
public synchronized void report() {
|
||||
final Clock nowclock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
|
||||
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024); // 1M pre-allocated to reduce heap churn
|
||||
List<NBMetric> metrics = new ArrayList<>();
|
||||
Iterator<NBComponent> allMetrics = NBComponentTraversal.traverseBreadth(getParent());
|
||||
allMetrics.forEachRemaining(m -> metrics.addAll(m.findComponentMetrics("")));
|
||||
|
||||
int total = 0;
|
||||
for (NBMetric metric : metrics) {
|
||||
sb = PromExpositionFormat.format(nowclock, sb, metric);
|
||||
total++;
|
||||
}
|
||||
AttachedMetricsPushReporter.logger.debug("formatted {} metrics in prom expo format", total);
|
||||
final String exposition = sb.toString();
|
||||
logger.trace(() -> "prom exposition format:\n" + exposition);
|
||||
|
||||
final double backoffRatio = 1.5;
|
||||
final double maxBackoffSeconds = 10;
|
||||
double backOff = 1.0;
|
||||
|
||||
final int maxRetries = 5;
|
||||
int remainingRetries = maxRetries;
|
||||
final List<Exception> errors = new ArrayList<>();
|
||||
boolean succeeded = false;
|
||||
|
||||
while (0 < remainingRetries) {
|
||||
remainingRetries--;
|
||||
final HttpClient client = getCachedClient();
|
||||
final HttpRequest.Builder rb = HttpRequest.newBuilder().uri(uri);
|
||||
if (needsAuth) {
|
||||
rb.setHeader("Authorization", bearerToken);
|
||||
}
|
||||
final HttpRequest request = rb.POST(BodyPublishers.ofString(exposition)).build();
|
||||
final BodyHandler<String> handler = HttpResponse.BodyHandlers.ofString();
|
||||
HttpResponse<String> response = null;
|
||||
try {
|
||||
response = client.send(request, handler);
|
||||
final int status = response.statusCode();
|
||||
if ((200 > status) || (300 <= status)) {
|
||||
final String errmsg = "status " + response.statusCode() + " while posting metrics to '" + this.uri + '\'';
|
||||
throw new RuntimeException(errmsg);
|
||||
}
|
||||
AttachedMetricsPushReporter.logger.debug("posted {} metrics to prom push endpoint '{}'", total, this.uri);
|
||||
succeeded = true;
|
||||
break;
|
||||
} catch (final Exception e) {
|
||||
errors.add(e);
|
||||
try {
|
||||
Thread.sleep((int) backOff * 1000L);
|
||||
} catch (final InterruptedException ignored) {
|
||||
}
|
||||
backOff = Math.min(maxBackoffSeconds, backOff * backoffRatio);
|
||||
}
|
||||
}
|
||||
if (!succeeded) {
|
||||
AttachedMetricsPushReporter.logger.error("Failed to send push prom metrics after {} tries. Errors follow:", maxRetries);
|
||||
for (final Exception error : errors) AttachedMetricsPushReporter.logger.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized HttpClient getCachedClient() {
|
||||
if (null == client) this.client = this.getNewClient();
|
||||
return this.client;
|
||||
}
|
||||
|
||||
private synchronized HttpClient getNewClient() {
|
||||
this.client = HttpClient.newBuilder()
|
||||
.followRedirects(Redirect.NORMAL)
|
||||
.connectTimeout(Duration.ofSeconds(60))
|
||||
.version(Version.HTTP_2)
|
||||
.build();
|
||||
return this.client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long now = System.currentTimeMillis();
|
||||
long reportAt = now + intervalSeconds * 1000L;
|
||||
long waitfor = reportAt - now;
|
||||
|
||||
loop:
|
||||
while (true) {
|
||||
|
||||
while (waitfor > 0) {
|
||||
try {
|
||||
if (shutdownSignal.await(waitfor, TimeUnit.MILLISECONDS)) {
|
||||
logger.debug("shutting down " + this);
|
||||
break loop;
|
||||
}
|
||||
now = System.currentTimeMillis();
|
||||
waitfor = now - reportAt;
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
logger.info("reporting metrics via push");
|
||||
try {
|
||||
report();
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
} finally {
|
||||
reportAt = now;
|
||||
now = System.currentTimeMillis();
|
||||
waitfor = now - reportAt;
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info("reporter thread shutting down");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeDetach() {
|
||||
this.shutdown();
|
||||
}
|
||||
|
||||
private void shutdown() {
|
||||
logger.debug("shutting down " + this);
|
||||
|
||||
lock.lock();
|
||||
shutdownSignal.signal();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components;
|
||||
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.PromExpositionFormat;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class AttachedMetricsSummaryReporter extends PeriodicTaskComponent {
|
||||
private final static Logger logger = LogManager.getLogger(AttachedMetricsPushReporter.class);
|
||||
|
||||
public AttachedMetricsSummaryReporter(NBComponent node, NBLabels extraLabels, long millis) {
|
||||
super(node, extraLabels, millis, true);
|
||||
}
|
||||
|
||||
public void task() {
|
||||
final Clock nowclock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
|
||||
|
||||
StringBuilder sb = new StringBuilder(1024 * 1024); // 1M pre-allocated to reduce heap churn
|
||||
List<NBMetric> metrics = new ArrayList<>();
|
||||
Iterator<NBComponent> allMetrics = NBComponentTraversal.traverseBreadth(getParent());
|
||||
allMetrics.forEachRemaining(m -> metrics.addAll(m.findComponentMetrics("")));
|
||||
|
||||
int total = 0;
|
||||
for (NBMetric metric : metrics) {
|
||||
sb = PromExpositionFormat.format(nowclock, sb, metric);
|
||||
total++;
|
||||
}
|
||||
AttachedMetricsSummaryReporter.logger.debug("formatted {} metrics in prom expo format", total);
|
||||
final String exposition = sb.toString();
|
||||
logger.info(() -> "prom exposition format:\n" + exposition);
|
||||
}
|
||||
}
|
@ -44,6 +44,7 @@ import io.nosqlbench.api.engine.metrics.reporters.*;
|
||||
import org.apache.logging.log4j.Marker;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.util.*;
|
||||
|
||||
import java.nio.file.Path;
|
||||
@ -119,21 +120,20 @@ public class NBCreators {
|
||||
return histogram;
|
||||
}
|
||||
|
||||
public AttachedMetricsSummaryReporter summaryReporter(long millis, String... labelspecs) {
|
||||
logger.debug("attaching summary reporter to " + base.description());
|
||||
NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
|
||||
AttachedMetricsSummaryReporter reporter = new AttachedMetricsSummaryReporter(base, extraLabels, millis);
|
||||
return reporter;
|
||||
}
|
||||
// public AttachedMetricsSummaryReporter summaryReporter(long millis, String... labelspecs) {
|
||||
// logger.debug("attaching summary reporter to " + base.description());
|
||||
// NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
|
||||
// AttachedMetricsSummaryReporter reporter = new AttachedMetricsSummaryReporter(base, extraLabels, millis);
|
||||
// return reporter;
|
||||
// }
|
||||
// public AttachedMetricCsvReporter csvReporter(int seconds, String dirpath, String... labelspecs) {
|
||||
// logger.debug("attaching summary reporter to " + base.description());
|
||||
// NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
|
||||
// AttachedMetricCsvReporter reporter = new AttachedMetricCsvReporter(base, extraLabels, Path.of(dirpath), seconds);
|
||||
// return reporter;
|
||||
// }
|
||||
public PromPushReporterComponent pushReporter(String targetUri, long millis, String config, String... labelspecs) {
|
||||
NBLabels extraLabels = NBLabels.forKV((Object[]) labelspecs);
|
||||
PromPushReporterComponent reporter = new PromPushReporterComponent(targetUri, config, millis, base,extraLabels);
|
||||
public PromPushReporterComponent pushReporter(String endpoint, long millis, NBLabels extraLabels) {
|
||||
PromPushReporterComponent reporter = new PromPushReporterComponent(this.base, URI.create(endpoint), millis, extraLabels);
|
||||
return reporter;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetricCounter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class AttachedMetricsSummaryReporterTest {
|
||||
@ -28,6 +29,7 @@ class AttachedMetricsSummaryReporterTest {
|
||||
private final Logger logger = LogManager.getLogger(AttachedMetricsSummaryReporterTest.class);
|
||||
|
||||
|
||||
@Disabled
|
||||
@Test
|
||||
public void testSingleObjectScope() {
|
||||
try (TestComponent root = new TestComponent("root", "root")) {
|
||||
@ -45,6 +47,7 @@ class AttachedMetricsSummaryReporterTest {
|
||||
// TODO: end lifecycle events need to be supported for metrics flushing
|
||||
|
||||
|
||||
@Disabled
|
||||
@Test
|
||||
public void testAttachedReporterScope() {
|
||||
try (NBComponentSubScope scope = new NBComponentSubScope()) {
|
||||
@ -52,7 +55,7 @@ class AttachedMetricsSummaryReporterTest {
|
||||
scope.add(root);
|
||||
TestComponent l1 = new TestComponent(root, "l1", "l1");
|
||||
NBMetricCounter counter = l1.create().counter("mycounter");
|
||||
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(1000);
|
||||
// AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(1000);
|
||||
NBFunctionGauge g1 = root.create().gauge("rootgauge", () -> 42d);
|
||||
NBFunctionGauge g2 = l1.create().gauge("leafgauge", () -> 48d);
|
||||
|
||||
@ -75,11 +78,12 @@ class AttachedMetricsSummaryReporterTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void testAttachedReporter() {
|
||||
TestComponent root = new TestComponent("root", "root");
|
||||
TestComponent l1 = new TestComponent(root, "l1", "l1");
|
||||
NBMetricCounter counter = l1.create().counter("mycounter");
|
||||
AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(5000);
|
||||
// AttachedMetricsSummaryReporter reporter = l1.create().summaryReporter(5000);
|
||||
NBFunctionGauge g1 = root.create().gauge("rootgauge", () -> 42d);
|
||||
NBFunctionGauge g2 = l1.create().gauge("leafgauge", () -> 48d);
|
||||
|
||||
@ -90,7 +94,7 @@ class AttachedMetricsSummaryReporterTest {
|
||||
Thread.sleep(2_000L);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
reporter.close();
|
||||
// reporter.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user