removing vestigial code

This commit is contained in:
Jonathan Shook 2025-01-02 12:15:06 -06:00
parent 4b14ea6f4a
commit 3ab18635bb
6 changed files with 1 additions and 462 deletions

View File

@ -49,8 +49,7 @@ public interface AsyncAction<D> extends Action {
* concurrency limits for the new cycle.
*
* Each action implementation is responsible for tracking and controlling
* its own limits of concurrency. The {@link BaseAsyncAction} base class is a
* convenient starting point for such implementations.
* its own limits of concurrency.
*
* If the action is known to have additional open slots for an operations to
* be started (according to the configured concurrency limits),

View File

@ -1,74 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityimpl.uniform.Activity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
/**
*
* @param <D> An type of state holder for an operation, holding everything unique to that cycle and operation
* @param <A> An type of of an Activity, a state holder for a runtime instance of an StandardActivity
*/
public abstract class BaseAsyncAction<D, A extends Activity> implements AsyncAction<D>,
Stoppable, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger("BaseAsyncAction");
protected final A activity;
protected int slot;
protected boolean running = true;
public BaseAsyncAction(A activity, int slot) {
this.activity = activity;
this.slot = slot;
onActivityDefUpdate(activity.getActivityDef());
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
ParameterMap params = activityDef.getParams();
params.getOptionalInteger("async").orElseThrow(
() -> new RuntimeException("the async parameter is required to activate async actions"));
}
public boolean enqueue(TrackedOp<D> opc) {
startOpCycle(opc);
return (running);
}
/**
* Implementations that extend this base class can call this method in order to put
* an operation in flight. Implementations should call either {@link TrackedOp#skip(int)}
* or {@link TrackedOp#start()}}.
*
* @param opc A tracked operation with state of parameterized type D
*/
public abstract void startOpCycle(TrackedOp<D> opc);
@Override
public void requestStop() {
logger.info(() -> this + " requested to stop.");
this.running = false;
}
}

View File

@ -1,187 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.core.ops.fluent;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityimpl.uniform.Activity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
/**
This tracker keeps track of the state of operations associated with it.
@param <D>
The payload data type of the associated Op, based on OpImpl */
public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
private final AtomicInteger pendingOps = new AtomicInteger(0);
private final String label;
private final long slot;
private final Counter pendingOpsCounter;
private final Timer cycleServiceTimer;
private final Timer cycleResponseTimer;
private int maxPendingOps = 1;
private LongFunction<D> cycleOpFunction;
public OpTrackerImpl(Activity activity, long slot) {
this.slot = slot;
this.label = "tracker-" + slot + "_" + activity.getAlias();
this.pendingOpsCounter = activity.metrics.pendingOpsCounter;
this.cycleServiceTimer = activity.metrics.cycleServiceTimer;
this.cycleResponseTimer = activity.metrics.cycleResponseTimer;
}
// for testing
public OpTrackerImpl(
String name, int slot, Timer cycleServiceTimer, Timer cycleResponseTimer,
Counter pendingOpsCounter
) {
this.label = name;
this.slot = slot;
this.cycleResponseTimer = cycleResponseTimer;
this.cycleServiceTimer = cycleServiceTimer;
this.pendingOpsCounter = pendingOpsCounter;
}
@Override
public void onOpStarted(StartedOp<D> op) {
pendingOps.incrementAndGet();
pendingOpsCounter.inc();
}
@Override
public void onOpSuccess(SucceededOp<D> op) {
pendingOpsCounter.dec();
int pending = this.pendingOps.decrementAndGet();
cycleServiceTimer.update(op.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
if (cycleResponseTimer != null) {
cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS);
}
if (pending < maxPendingOps) {
synchronized (this) {
notify();
}
}
}
@Override
public void onOpSkipped(SkippedOp<D> op) {
pendingOpsCounter.dec();
int pending = this.pendingOps.decrementAndGet();
if (pending < maxPendingOps) {
synchronized (this) {
notify();
}
}
}
@Override
public void onOpFailure(FailedOp<D> op) {
pendingOpsCounter.dec();
int pending = this.pendingOps.decrementAndGet();
cycleServiceTimer.update(op.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
if (cycleResponseTimer != null) {
cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS);
}
if (pending < maxPendingOps) {
synchronized (this) {
notify();
}
}
}
@Override
public void setMaxPendingOps(int maxPendingOps) {
this.maxPendingOps = maxPendingOps;
synchronized (this) {
notifyAll();
}
}
@Override
public boolean isFull() {
return this.pendingOps.intValue() >= maxPendingOps;
}
@Override
public int getPendingOps() {
return pendingOps.intValue();
}
@Override
public void setCycleOpFunction(LongFunction<D> newOpFunction) {
this.cycleOpFunction = newOpFunction;
}
@Override
public TrackedOp<D> newOp(long cycle, OpEvents<D> strideTracker) {
D opstate = cycleOpFunction.apply(cycle);
OpImpl<D> op = new EventedOpImpl<>(this, strideTracker);
op.setCycle(cycle);
op.setData(opstate);
return op;
}
public int getMaxPendingOps() {
return maxPendingOps;
}
@Override
public synchronized boolean awaitCompletion(long timeout) {
long endAt = System.currentTimeMillis() + timeout;
while (getPendingOps() > 0 && System.currentTimeMillis() < endAt) {
try {
long waitfor = Math.max(0, endAt - System.currentTimeMillis());
wait(waitfor);
} catch (InterruptedException ignored) {
}
}
return getPendingOps() == 0;
}
@Override
public String toString() {
return "OpTracker-" + label + ":" + this.slot + " " + this.pendingOps.get() + "/" + maxPendingOps + " ops ";
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
this.maxPendingOps = getMaxPendingOpsForThisThread(activityDef);
}
private int getMaxPendingOpsForThisThread(ActivityDef def) {
int maxTotalOpsInFlight = def.getParams().getOptionalInteger("async").orElse(1);
int threads = def.getThreads();
return (maxTotalOpsInFlight / threads) + (slot < (maxTotalOpsInFlight % threads) ? 1 : 0);
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityimpl.uniform.Activity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
public interface ActivityDispenser {
Activity getActivity(ActivityDef activityDef);
}

View File

@ -1,137 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.container;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import org.graalvm.polyglot.Value;
import org.graalvm.polyglot.proxy.ProxyObject;
import javax.script.Bindings;
import java.util.*;
import java.util.stream.Collectors;
/**
* Provide a bindings wrapper around a ScenarioController,
*/
public class ActivityBindings implements Bindings, ProxyObject {
private final ContainerActivitiesController scenario;
private final Map<String, Bindings> elementMap = new HashMap<String, Bindings>();
public ActivityBindings(ContainerActivitiesController containerActivitiesController) {
this.scenario = containerActivitiesController;
}
@Override
public Object put(String name, Object value) {
throw new RuntimeException("ScenarioBindings do not allow put(...)");
}
@Override
public void putAll(Map<? extends String, ?> toMerge) {
throw new RuntimeException("ScenarioBindings do not allow putAll(...)");
}
@Override
public void clear() {
throw new RuntimeException("ScenarioBindings do not allow clear(...)");
}
@Override
public Set<String> keySet() {
return scenario.getAliases();
}
@Override
public Collection<Object> values() {
return wrap(scenario.getActivityDefs());
}
private Collection<Object> wrap(List<ActivityDef> activityDefs) {
return activityDefs
.stream()
.map(s -> (Bindings) s)
.collect(Collectors.toList());
}
@Override
public Set<Entry<String, Object>> entrySet() {
Set<Entry<String,Object>> newset = new HashSet<>();
for (ActivityDef activityDef : scenario.getActivityDefs()) {
newset.add(new AbstractMap.SimpleImmutableEntry<String, Object>(activityDef.getAlias(),activityDef));
}
return newset;
}
@Override
public int size() {
return scenario.getActivityDefs().size();
}
@Override
public boolean isEmpty() {
return scenario.getActivityDefs().isEmpty();
}
@Override
public boolean containsKey(Object key) {
return scenario.getAliases().contains(String.valueOf(key));
}
@Override
public boolean containsValue(Object value) {
throw new RuntimeException("Should this be used?");
}
@Override
public Bindings get(Object key) {
Bindings activityParams = scenario.getActivityDef(String.valueOf(key)).getParams();
return activityParams;
}
@Override
public Object remove(Object key) {
throw new RuntimeException("this is not the advised way to forceStopMotors an activity");
// scenario.forceStopMotors(String.valueOf(key));
}
@Override
public Object getMember(String key) {
Bindings bindings = get(key);
return bindings;
}
@Override
public Object getMemberKeys() {
ArrayList<String> keys = new ArrayList<>(keySet());
return keys;
}
@Override
public boolean hasMember(String key) {
boolean b = containsKey(key);
return b;
}
@Override
public void putMember(String key, Value value) {
if (value.isHostObject()) {
put(key,value.asHostObject());
} else {
throw new RuntimeException("Unable to put a non-host object into the activities bindings layer:" + value);
}
}
}

View File

@ -1,38 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.core.ops.fluent;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.SucceededOp;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.EventedOpImpl;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.StartedOp;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import org.junit.jupiter.api.Test;
public class OpTrackerImplTest {
@Test
public void testLifeCycle() {
OpTrackerImpl<String> tracker = new OpTrackerImpl<String>("test", 0, new Timer(), new Timer(), new Counter());
TrackedOp<String> tracked = new EventedOpImpl<>(tracker);
StartedOp<String> started = tracked.start();
tracker.onOpStarted(started);
SucceededOp stop = started.succeed(23);
}
}