first half of cqld4 diagnostics, see todos

This commit is contained in:
Jonathan Shook 2024-01-31 09:05:54 -06:00
parent 79dd961698
commit 5f3813c83e
8 changed files with 425 additions and 5 deletions

View File

@ -18,16 +18,18 @@ package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.config.*;
import com.datastax.oss.driver.api.core.session.SessionBuilder;
import com.datastax.oss.driver.internal.core.config.composite.CompositeDriverConfigLoader;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.NodeFilterToDistanceEvaluatorAdapter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.adapter.cqld4.optionhelpers.OptionHelpers;
import io.nosqlbench.adapter.cqld4.wrapper.Cqld4LoadBalancerObserver;
import io.nosqlbench.adapter.cqld4.wrapper.Cqld4SessionBuilder;
import io.nosqlbench.adapter.cqld4.wrapper.NodeSummary;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.nbio.Content;
import io.nosqlbench.nb.api.nbio.NBIO;
import io.nosqlbench.nb.api.engine.util.SSLKsFactory;
@ -68,7 +70,13 @@ public class Cqld4Space implements AutoCloseable {
}
private CqlSession createSession(NBConfiguration cfg) {
CqlSessionBuilder builder = new CqlSessionBuilder();
NodeSummary diag = NodeSummary.valueOf(cfg.get("diag"));
CqlSessionBuilder builder = switch (diag) {
default -> new CqlSessionBuilder();
case NodeSummary.addr, NodeSummary.mid, NodeSummary.all -> new Cqld4SessionBuilder();
};
// stop insights for testing
OptionsMap defaults = new OptionsMap();
@ -211,6 +219,19 @@ public class Cqld4Space implements AutoCloseable {
}
builder.withConfigLoader(dcl);
// for (String profileName : dcl.getInitialConfig().getProfiles().keySet()) {
// logger.info("Installing load balancer observer in profile '" + profileName);
// DriverExecutionProfile profile = dcl.getInitialConfig().getProfile(profileName);
// String string = profile.getString(TypedDriverOption.LOAD_BALANCING_POLICY_CLASS.getRawOption());
// dcl.getInitialConfig().getDefaultProfile(profileName).getp
// Cqld4LoadBalancerObserver observer = new Cqld4LoadBalancerObserver(string);
// }
//
//
// builder.withNodeFilter()
if (builder instanceof Cqld4SessionBuilder cqld4sb) {
cqld4sb.setNodeSummarizer(diag);
}
CqlSession session = builder.build();
return session;
}
@ -303,11 +324,15 @@ public class Cqld4Space implements AutoCloseable {
.add(Param.optional("cloud_proxy_address", String.class, "Cloud Proxy Address"))
.add(Param.optional("maxpages", Integer.class, "Maximum number of pages allowed per CQL request"))
.add(Param.optional("maxretryreplace", Integer.class, "Maximum number of retry replaces with LWT for a CQL request"))
.add(Param.defaultTo("diag", "none").setDescription("What level of diagnostics to report"))
.add(SSLKsFactory.get().getConfigModel())
.add(getDriverOptionsModel())
.add(new OptionHelpers(new OptionsMap()).getConfigModel())
.asReadOnly();
}
private static enum Diagnostics {
queryplan
}
@Override

View File

@ -62,6 +62,11 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
return varbinder;
}
@Override
public LongFunction<CqlSession> getSessionFunc() {
return super.getSessionFunc();
}
protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF, ParsedOp op) {
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.wrapper;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import org.jetbrains.annotations.NotNull;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Cqld4DriverContext extends DefaultDriverContext {
private DriverContext delegate;
ConcurrentHashMap<String,LoadBalancingPolicy> wrappedPolicies = new ConcurrentHashMap<>();
private NodeSummary summarizer;
public Cqld4DriverContext(DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
super(configLoader, programmaticArguments);
}
@NotNull
@Override
public Map<String, LoadBalancingPolicy> getLoadBalancingPolicies() {
Map<String, LoadBalancingPolicy> loadBalancingPolicies = super.getLoadBalancingPolicies();
for (String profileName : loadBalancingPolicies.keySet()) {
wrappedPolicies.computeIfAbsent(profileName,s -> wrapPolicy(loadBalancingPolicies.get(profileName),this.summarizer));
}
return Collections.unmodifiableMap(wrappedPolicies);
}
private LoadBalancingPolicy wrapPolicy(LoadBalancingPolicy wrapped, NodeSummary summarizer) {
return new Cqld4LoadBalancerObserver(wrapped, summarizer);
}
public DriverContext setSummarizer(NodeSummary summarizer) {
this.summarizer = summarizer;
return this;
}
}

View File

@ -0,0 +1,172 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.wrapper;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class Cqld4LoadBalancerObserver implements LoadBalancingPolicy {
private final static Logger logger = LogManager.getLogger("NODELOG");
private final Map<String, Bucket> buffer = new ConcurrentHashMap<>();
private final Map<String, Bucket> totals = new ConcurrentHashMap<>();
private NodeSummary summarizer = NodeSummary.none;
private final LoadBalancingPolicy delegate;
private final long minReportGap = 1000;
private long lastReportTimeMs = System.currentTimeMillis();
private final long maxReportCount = 1000;
private AtomicLong untallied = new AtomicLong();
public Cqld4LoadBalancerObserver(LoadBalancingPolicy delegate, NodeSummary summarizer) {
logger.info("Loading CQL diagnostic layer");
this.delegate = delegate;
this.summarizer = summarizer;
}
@Override
public void init(@NotNull Map<UUID, Node> nodes, @NotNull LoadBalancingPolicy.DistanceReporter distanceReporter) {
delegate.init(nodes, distanceReporter);
}
@NotNull
@Override
public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
Queue<Node> nodeQueue = delegate.newQueryPlan(request, session);
tabulate(nodeQueue);
return nodeQueue;
}
private void tabulate(Queue<Node> nodeQueue) {
untallied.getAndAdd(1);
StringBuilder sb = new StringBuilder();
Iterator<Node> nodes = nodeQueue.iterator();
while (nodes.hasNext()) {
Node node = nodes.next();
String bcname = node.getBroadcastAddress().map(InetSocketAddress::toString).orElse("UNSET");
buffer.computeIfAbsent(bcname, Bucket::new).increment();
sb.append("\n").append(NodeSummary.mid.summarize(node));
sb.append(" ;; ");
}
// sb.setLength(sb.length()-" ;; ".length());
// String summary = sb.toString();
// logger.info(summary);
}
public synchronized void checkpoint() {
emitSummary("window", buffer);
buffer.forEach((k,v) -> {
totals.computeIfAbsent(k,n -> new Bucket(k)).add(v);
});
buffer.clear();
emitSummary("totals", totals);
}
private void emitSummary(String desc, Map<String, Bucket> buffer) {
List<Bucket> values = new ArrayList<Bucket>(buffer.values());
Collections.sort(values);
StringBuilder sb = new StringBuilder();
for (Bucket bucket : values) {
sb.append(bucket.summary()).append(" ");
}
logger.info("node selection: (" + desc + ") " + sb.toString());
}
@Override
public void onAdd(@NotNull Node node) {
delegate.onAdd(node);
}
@Override
public void onUp(@NotNull Node node) {
delegate.onUp(node);
}
@Override
public void onDown(@NotNull Node node) {
delegate.onDown(node);
}
@Override
public void onRemove(@NotNull Node node) {
delegate.onRemove(node);
}
@Override
public void close() {
delegate.close();
}
public static class Bucket implements Comparable<Bucket> {
public long nanotime = System.nanoTime();
public String name;
public long count;
public Bucket(String name) {
this.name = name;
}
@Override
public int hashCode() {
return name.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Bucket b) {
return name.equals(b.name);
}
return false;
}
public synchronized void increment() {
count++;
}
public void add(Bucket v) {
count+=v.count;
}
@Override
public int compareTo(@NotNull Cqld4LoadBalancerObserver.Bucket o) {
return this.name.compareTo(o.name);
}
public String summary() {
return name + ":" + count;
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.wrapper;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.SessionWrapper;
import org.jetbrains.annotations.NotNull;
public class Cqld4ObserverSession extends SessionWrapper implements CqlSession {
Cqld4DriverContext contextWrapper;
public Cqld4ObserverSession(@NotNull Session delegate) {
super(delegate);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.wrapper;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
import com.datastax.oss.driver.api.core.session.SessionBuilder;
import org.jetbrains.annotations.NotNull;
public class Cqld4SessionBuilder extends CqlSessionBuilder {
private NodeSummary summarizer;
@Override
protected CqlSession wrap(@NotNull CqlSession defaultSession) {
return new Cqld4ObserverSession(defaultSession);
}
@Override
protected DriverContext buildContext(DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
return new Cqld4DriverContext(configLoader,programmaticArguments).setSummarizer(summarizer);
}
public void setNodeSummarizer(NodeSummary summarizer) {
this.summarizer = summarizer;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.cqld4.wrapper;
import com.datastax.oss.driver.api.core.metadata.Node;
import java.net.InetSocketAddress;
public enum NodeSummary {
none, addr, mid, all;
public String summarize(Node node) {
StringBuilder sb = new StringBuilder();
sb.append(" bcaddr:").append(node.getBroadcastAddress().map(InetSocketAddress::toString).orElse(""));
if (this == addr) return sb.toString();
sb.append(" lsaddr:").append(node.getListenAddress().map(InetSocketAddress::toString).orElse(""));
sb.append(" rpcaddr:").append(node.getBroadcastRpcAddress().map(InetSocketAddress::toString).orElse(""));
sb.append(" DC:").append(node.getDatacenter());
sb.append(" R:").append(node.getRack());
sb.append(" SmV:").append(node.getSchemaVersion());
sb.append(" dist:").append(node.getDistance());
sb.append(" ").append(node.getState());
sb.append(" conn:").append(node.getOpenConnections());
if (this == mid) return sb.toString();
sb.append(" up:").append(String.format("%.3fS", ((double) node.getUpSinceMillis()) / 1000.0d));
sb.append(" reconn:").append(node.isReconnecting() ? "yes" : "no");
sb.append(" extras:").append(node.getExtras());
sb.append(" endpoit:").append(node.getEndPoint());
sb.append(" host_id:").append(node.getHostId());
return sb.toString();
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* <P>This package contains wrapper logic for the CQL driver to allow more
* detailed diagnostic data to be captured. Because the Driver "v4" tries
* to protect developers from themselves and nearly disallows extension,
* lots of boilerplate had to be added to implement a wrapper.</p>
*
* <p>The purpose of this code is to see basic details from the load balancer's behavior.
* The operative part of this package is simply
* {@link io.nosqlbench.adapter.cqld4.wrapper.Cqld4LoadBalancerObserver},
* which intercepts query plan logic and logs details at a configurable level of details.
* </P>
*
* TODO: adapt diag markers from http
* TODO: inline the diagnostic filters into op dispenser logic
* TODO: make load balancer diagnostics record scoreboard data isochronously to a separate file
*/
package io.nosqlbench.adapter.cqld4.wrapper;