post merge fixes

This commit is contained in:
Jonathan Shook 2023-10-23 23:54:31 -05:00
parent 68f5fefd3d
commit 777c4aa3c7
13 changed files with 102 additions and 65 deletions

View File

@ -15,7 +15,7 @@ description: |
scenarios:
cassandra:
drop: run tags='block:drop' labels='target:astra' threads===1 cycles===2
drop: run tags='block:drop' threads==undef cycles==undef
# nb5 cql-vector2 cassandra.schema host=localhost localdc=datacenter1 dimensions=100
schema: run tags='block:schema' threads==undef cycles==undef
# nb5 cql-vector2 cassandra.rampup host=localhost localdc=datacenter1 dimensions=100 trainsize=1000000 dataset=glove-100-angular rate=10000
@ -25,8 +25,8 @@ scenarios:
run alias=search_and_index tags='block:search_and_index,optype=select' labels='target:cassandra'
cycles=TEMPLATE(testsize) errors=counter,warn threads=1
astra_vectors:
drop: run tags='block:drop' labels='target:astra' threads===1 cycles===2 driverconfig=app.conf
schema: run tags='block:schema' labels='target:astra' threads===1 cycles===2
drop: run tags='block:drop' labels='target:astra' threads==undef cycles==undef driverconfig=app.conf
schema: run tags='block:schema' labels='target:astra' threads==undef cycles==undef
rampup: run tags='block:rampup' labels='target:astra' threads=100 cycles=TEMPLATE(trainsize) errors=counter
# search_and_index_unthrottled: >-
# run tags='block:search_and_index,optype=select' labels='target:astra'

View File

@ -202,7 +202,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
}
public String toString() {
return activityDef.getAlias() + ':' + this.runState + ':' + this.tally;
return (activityDef!=null ? activityDef.getAlias() : "unset_alias" ) + ':' + this.runState + ':' + this.tally ;
}
@Override

View File

@ -36,6 +36,7 @@ import io.nosqlbench.api.errors.OpConfigError;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.events.NBEvent;
import io.nosqlbench.components.events.ParamChange;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.StrideRateSpec;
@ -55,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @param <R> A type of runnable which wraps the operations for this type of driver.
* @param <S> The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider {
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends Op>> sequence;
@ -203,6 +204,25 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
}
}
// @Override
// public synchronized void onActivityDefUpdate(final ActivityDef activityDef) {
// super.onActivityDefUpdate(activityDef);
//
// for (final DriverAdapter adapter : this.adapters.values())
// if (adapter instanceof NBReconfigurable reconfigurable) {
// NBConfigModel cfgModel = reconfigurable.getReconfigModel();
// final Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
// if (op_yaml_loc.isPresent()) {
// final Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
// final OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities");
// cfgModel = cfgModel.add(workload.getConfigModel());
// }
// final NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
// reconfigurable.applyReconfig(cfg);
// }
//
// }
@Override
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
@ -257,4 +277,6 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
default -> super.onEvent(event);
}
}
}

View File

@ -35,22 +35,31 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
public class StandardActivityType<A extends StandardActivity<?,?>> extends SimpleActivity implements ActivityType<A> {
public class StandardActivityType<A extends StandardActivity<?,?>> implements ActivityType<A> {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final Map<String, DriverAdapter> adapters = new HashMap<>();
private final NBComponent parent;
// private final DriverAdapter<?, ?> adapter;
private final ActivityDef activityDef;
public StandardActivityType(final DriverAdapter<?,?> adapter, final ActivityDef activityDef, final NBComponent parent) {
super(parent,activityDef
.deprecate("type","driver")
.deprecate("yaml", "workload")
);
this.parent = parent;
// this.adapter = adapter;
this.activityDef = activityDef;
// super(parent,activityDef
// .deprecate("type","driver")
// .deprecate("yaml", "workload")
// );
adapters.put(adapter.getAdapterName(),adapter);
if (adapter instanceof ActivityDefAware) ((ActivityDefAware) adapter).setActivityDef(activityDef);
}
public StandardActivityType(final ActivityDef activityDef, final NBComponent parent) {
super(parent,activityDef);
this.parent = parent;
this.activityDef = activityDef;
// super(parent,activityDef);
}
@Override
@ -61,33 +70,10 @@ public class StandardActivityType<A extends StandardActivity<?,?>> extends Simpl
return (A) new StandardActivity(parent, activityDef);
}
@Override
public synchronized void onActivityDefUpdate(final ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
for (final DriverAdapter adapter : this.adapters.values())
if (adapter instanceof NBReconfigurable reconfigurable) {
NBConfigModel cfgModel = reconfigurable.getReconfigModel();
final Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (op_yaml_loc.isPresent()) {
final Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
final OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities");
cfgModel = cfgModel.add(workload.getConfigModel());
}
final NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
reconfigurable.applyReconfig(cfg);
}
}
@Override
public ActionDispenser getActionDispenser(final A activity) {
return new StandardActionDispenser(activity);
}
@Override
public void shutdownActivity() {
}
}

View File

@ -219,6 +219,12 @@ public class NBCLIScenarioParser {
if (!shortened.equals(sanitized)) {
logger.warn("The identifier or value '" + shortened + "' was sanitized to '" + sanitized + "' to be compatible with monitoring systems. You should probably change this to make diagnostics easier.");
StackTraceElement[] elements = Thread.currentThread().getStackTrace();
StringBuilder stb = new StringBuilder();
for (StackTraceElement element : elements) {
stb.append("\tat ").append(element).append("\n");
}
logger.warn("stacktrace: " + stb.toString());
}
return sanitized;
}

View File

@ -413,6 +413,8 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
finish(true);
return result;
}
} catch (Exception e2) {
throw new RuntimeException(e2);
}
}
@ -426,6 +428,9 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
* has been completed at least.
*/
private void awaitMotorsAtLeastRunning() {
if (this.exception!=null) {
return;
}
RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = states.getMaxState();
if (maxState == RunState.Errored) {
@ -489,7 +494,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
RunState maxState = state.getMaxState();
activity.setRunState(maxState);
if (maxState == RunState.Errored) {
throw new RuntimeException("Error while waiting for activity completion:" + this.exception);
throw new RuntimeException("Error while waiting for activity completion" + (this.exception!=null ? this.exception.toString() : ""));
}
}

View File

@ -75,6 +75,7 @@ public class ScenarioActivitiesController extends NBBaseComponent {
private ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, this);
activity.initActivity();
ActivityExecutor executor = new ActivityExecutor(activity);
Future<ExecutionResult> startedActivity = executorService.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);

View File

@ -85,7 +85,7 @@ public class ScenariosExecutor extends NBBaseComponent {
* @return the final scenario-result map.
*/
public ScenariosResults awaitAllResults() {
return awaitAllResults(Long.MAX_VALUE / 2, 60000); // half max value, to avoid overflow
return awaitAllResults(Long.MAX_VALUE / 2, 10000); // half max value, to avoid overflow
}
/**
@ -103,30 +103,40 @@ public class ScenariosExecutor extends NBBaseComponent {
long timeoutAt = System.currentTimeMillis() + timeout;
executor.shutdown();
boolean isShutdown = false;
while (!isShutdown && System.currentTimeMillis() < timeoutAt) {
long waitedAt = System.currentTimeMillis();
long updateAt = Math.min(timeoutAt, waitedAt + updateInterval);
while (!isShutdown && System.currentTimeMillis() < timeoutAt) {
while (!isShutdown && System.currentTimeMillis() < updateAt) {
try {
long timeRemaining = updateAt - System.currentTimeMillis();
isShutdown = executor.awaitTermination(timeRemaining, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
try {
while (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
if (System.currentTimeMillis()>=timeoutAt) {
throw new RuntimeException("executor still running scenarios after awaiting all results for " + timeout
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
}
logger.trace(() -> "waited " + (System.currentTimeMillis()-waitFrom) + " millis for scenarios");
updateAt = Math.min(timeoutAt, System.currentTimeMillis() + updateInterval);
}
logger.debug("scenarios executor shutdown after " + (System.currentTimeMillis() - waitedAt) + "ms.");
} catch (InterruptedException ignored) {
}
if (!isShutdown) {
throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
}
// while (!executor.isShutdown() && System.currentTimeMillis() < timeoutAt) {
// long waitedAt = System.currentTimeMillis();
// long updateAt = Math.min(timeoutAt, waitedAt + updateInterval);
// while (!executor.isShutdown() && System.currentTimeMillis() < timeoutAt) {
// while (!executor.isShutdown() && System.currentTimeMillis() < updateAt) {
// try {
// long timeRemaining = updateAt - System.currentTimeMillis();
// if (executor.awaitTermination(timeRemaining, TimeUnit.MILLISECONDS)) {
// logger.info("executor shutdown during await");
// }
// } catch (InterruptedException ignored) {
// }
// }
// logger.trace(() -> "waited " + (System.currentTimeMillis()-waitFrom) + " millis for scenarios");
// updateAt = Math.min(timeoutAt, System.currentTimeMillis() + updateInterval);
// }
//
// logger.debug("scenarios executor shutdown after " + (System.currentTimeMillis() - waitedAt) + "ms.");
// }
//
// if (!executor.isShutdown()) {
// throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout
// + "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
// }
Map<NBScenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
getAsyncResultStatus()
.entrySet()

View File

@ -100,6 +100,8 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
try (NBComponentSubScope scope = new NBComponentSubScope(scenario)) {
scenariosExecutor.execute(scenario, params);
// this.doReportSummaries(this.reportSummaryTo, this.result);
} catch (Exception e) {
results.error(e);
}
final ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults();
logger.debug(() -> "Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");

View File

@ -361,29 +361,29 @@
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<version>22.3.3</version>
<version>23.0.1</version>
</dependency>
<dependency>
<groupId>org.graalvm.js</groupId>
<artifactId>js</artifactId>
<version>22.3.3</version>
<version>23.0.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.graalvm.js</groupId>
<artifactId>js-scriptengine</artifactId>
<version>23.1.0</version>
<version>23.0.1</version>
</dependency>
<dependency>
<groupId>org.graalvm.tools</groupId>
<artifactId>profiler</artifactId>
<version>22.3.3</version>
<version>23.0.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.graalvm.tools</groupId>
<artifactId>chromeinspector</artifactId>
<version>21.2.0</version>
<version>23.0.1</version>
<scope>runtime</scope>
</dependency>

View File

@ -235,6 +235,13 @@ public class PromExpositionFormat {
if (!word.equals(sanitized)) {
logger.warn("The identifier or value '" + word + "' was sanitized to '" + sanitized + "' to be compatible with monitoring systems. You should probably change this to make diagnostics easier.");
StackTraceElement[] elements = Thread.currentThread().getStackTrace();
StringBuilder stb = new StringBuilder();
for (StackTraceElement element : elements) {
stb.append("\tat ").append(element).append("\n");
}
logger.warn("stacktrace: " + stb.toString());
}
return sanitized;
}

View File

@ -60,8 +60,8 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
for (NBComponent extant : this.children) {
if (!child.getComponentOnlyLabels().isEmpty() && child.getComponentOnlyLabels().equals(extant.getComponentOnlyLabels())) {
throw new RuntimeException("Adding second child under already-defined labels is not allowed:\n" +
" extant: " + extant + "\n" +
" adding: " + child);
" extant: (" + extant.getClass().getSimpleName() + ") " + extant.description() + "\n" +
" adding: (" + child.getClass().getSimpleName() + ") " + child.description());
}
}

View File

@ -155,5 +155,3 @@ control of all data flows. (Yes you could use the external gateway, but that was
part.) This further degraded the quality of metrics data by taking the timing and cadence of
metrics flows out of control of the client. It also put metrics flow behind two uncoordinated
polling mechanisms which degraded the immediacy of the metrics.