Merge branch 'main' of github.com:nosqlbench/nosqlbench

This commit is contained in:
Jonathan Shook 2023-01-09 11:53:05 -06:00
commit b8e0bcd266
19 changed files with 734 additions and 323 deletions

View File

@ -40,9 +40,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-stdout</artifactId>
<version>4.17.32-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,247 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
public class TcpAdapterSpace {
private final static Logger logger = LogManager.getLogger(TcpAdapterSpace.class);
private final String name;
private final NBConfiguration config;
public TcpAdapterSpace(String name, NBConfiguration config) {
this.name = name;
this.config = config;
}
protected PrintWriter createPrintWriter() {
SocketFactory socketFactory = SocketFactory.getDefault();
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
socketFactory = SSLKsFactory.get().createSocketFactory(sslCfg);
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(Integer.class, "port").orElse(12345);
try {
Socket socket = socketFactory.createSocket(host, port);
logger.info(() -> "connected to " + socket.toString());
return new PrintWriter(socket.getOutputStream());
} catch (IOException e) {
throw new RuntimeException("Error opening socket:" + e, e);
}
}
public void TCPServerActivity(ActivityDef activityDef) {
// super(activityDef);
// boolean sslEnabled = activityDef.getParams().getOptionalBoolean("ssl").orElse(false);
// this.capacity=activityDef.getParams().getOptionalInteger("capacity").orElse(10);
// queue = new LinkedBlockingQueue<>(capacity);
//
// if (sslEnabled) {
//
// NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(activityDef.getParams());
// socketFactory = SSLKsFactory.get().createSSLServerSocketFactory(sslCfg);
// } else {
// socketFactory = ServerSocketFactory.getDefault();
// }
}
// public void shutdownActivity() {
// super.shutdownActivity();
// for (TCPServerActivity.Shutdown toClose : managedShutdown) {
// toClose.shutdown();
// }
// }
// server write
// @Override
// public synchronized void write(String statement) {
// while (true) {
// try {
// queue.put(statement);
// return;
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// }
// create server writer
//
// @Override
// protected synchronized Writer createPrintWriter() {
//
// String host = getActivityDef().getParams().getOptionalString("host").orElse("localhost");
// int port = getActivityDef().getParams().getOptionalInteger("port").orElse(12345);
//
// if (listenerSocket == null || listenerSocket.isClosed()) {
// try {
// InetAddress hostAddr = InetAddress.getByName(host);
// listenerSocket = socketFactory.createServerSocket(port, 10, hostAddr);
// if (socketFactory instanceof SSLServerSocketFactory) {
// logger.info(() -> "SSL enabled on server socket " + listenerSocket);
// }
// TCPServerActivity.SocketAcceptor socketAcceptor = new TCPServerActivity.SocketAcceptor(queue, listenerSocket);
// managedShutdown.add(socketAcceptor);
// Thread acceptorThread = new Thread(socketAcceptor);
// acceptorThread.setDaemon(true);
// acceptorThread.setName("Listener/" + listenerSocket);
// acceptorThread.start();
// } catch (IOException e) {
// throw new RuntimeException("Error listening on listenerSocket:" + e, e);
// }
// }
//
// TCPServerActivity.QueueWriterAdapter queueWriterAdapter = new TCPServerActivity.QueueWriterAdapter(this.queue);
// logger.info(() -> "initialized queue writer:" + queueWriterAdapter);
// return queueWriterAdapter;
//
// }
// socket writer
// public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
// this.sourceQueue = sourceQueue;
// try {
// outputStream = connectedSocket.getOutputStream();
// this.writer = new OutputStreamWriter(outputStream);
// //connectedSocket.shutdownInput();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// server thread
// public void run() {
// try (Writer writer = this.writer) {
// while (true) {
// while (!sourceQueue.isEmpty() || running) {
// try {
// String data = sourceQueue.take();
// writer.write(data);
// writer.flush();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// try {
// Thread.sleep(10);
// } catch (InterruptedException ignored) {
// }
// }
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
//
// }
// server writer adapter
// public static class QueueWriterAdapter extends Writer {
// private BlockingQueue<String> queue;
//
// public QueueWriterAdapter(BlockingQueue<String> queue) {
// this.queue = queue;
// }
//
// @Override
// public synchronized void write( char[] cbuf, int off, int len) {
// while (true) {
// try {
// queue.put(new String(cbuf, off, len));
// return;
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
// }
//
// @Override
// public synchronized void flush() throws IOException {
// }
//
// @Override
// public synchronized void close() throws IOException {
// flush();
// queue = null;
// }
//
// }
// server socket acceptor
// public class SocketAcceptor implements Runnable, TCPServerActivity.Shutdown {
// private final BlockingQueue<String> queue;
// private final ServerSocket serverSocket;
// private boolean running = true;
//
// public SocketAcceptor(BlockingQueue<String> queue, ServerSocket serverSocket) {
// this.queue = queue;
// this.serverSocket = serverSocket;
// }
//
// public void shutdown() {
// this.running = false;
// }
//
// @Override
// public void run() {
// try (ServerSocket serverSocket = this.serverSocket) {
// while (running) {
// serverSocket.setSoTimeout(1000);
// serverSocket.setReuseAddress(true);
// try {
// Socket connectedSocket = serverSocket.accept();
// TCPServerActivity.SocketWriter writer = new TCPServerActivity.SocketWriter(queue, connectedSocket);
// TCPServerActivity.this.managedShutdown.add(writer);
// Thread writerThread = new Thread(writer);
// writerThread.setName("SocketWriter/" + connectedSocket);
// writerThread.setDaemon(true);
// writerThread.start();
// logger.info(() -> "Started writer thread for " + connectedSocket);
// } catch (SocketTimeoutException ignored) {
// }
// }
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
// }
public static NBConfigModel getConfigModel() {
return ConfigModel.of(TcpAdapterSpace.class)
.asReadOnly();
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
@Service(value= DriverAdapter.class,selector = "tcp")
public class TcpDriverAdapter extends BaseDriverAdapter<Op,TcpAdapterSpace> {
@Override
public OpMapper<Op> getOpMapper() {
return new TcpOpMapper(this);
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.adapter.tcpserver.TcpServerAdapterSpace;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.Socket;
public class TcpClientAdapterSpace {
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
public TcpClientAdapterSpace(NBConfiguration config) {
this.config = config;
this.writer = createPrintWriter();
}
protected PrintWriter createPrintWriter() {
SocketFactory socketFactory = SocketFactory.getDefault();
boolean sslEnabled = config.getOptional(boolean.class, "ssl").orElse(false);
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
socketFactory = SSLKsFactory.get().createSocketFactory(sslCfg);
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(int.class, "port").orElse(12345);
try {
Socket socket = socketFactory.createSocket(host, port);
logger.info("connected to " + socket.toString());
return new PrintWriter(socket.getOutputStream());
} catch (IOException e) {
throw new RuntimeException("Error opening socket:" + e, e);
}
}
public void writeflush(String text) {
try {
writer.write(text);
writer.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(TcpClientAdapterSpace.class)
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("host","localhost")
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
)
.add(
Param.defaultTo("newline",true)
.setDescription("whether to automatically add a missing newline to the end of any output\n")
)
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
.setDescription("""
Which format to use.
If provided, the format will override any statement formats provided by the YAML.
If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("""
This is a simple way to specify a filter for the names of bindings that you want to use.
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
)
.asReadOnly();
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.adapter.stdout.StdoutDriverAdapter;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
@Service(value= DriverAdapter.class,selector = "tcpclient")
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger(TcpClientDriverAdapter.class);
private final static StdoutDriverAdapter adap = new StdoutDriverAdapter();
@Override
public OpMapper<TcpClientOp> getOpMapper() {
DriverSpaceCache<? extends TcpClientAdapterSpace> ctxCache = getSpaceCache();
return new TcpClientOpMapper(this,ctxCache);
}
@Override
public Function<String, ? extends TcpClientAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new TcpClientAdapterSpace(cfg);
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(this.getClass())
.add(super.getConfigModel())
.add(TcpClientAdapterSpace.getConfigModel());
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
return adap.getSyntheticOpTemplates(stmtsDocList, cfg);
}
}

View File

@ -14,20 +14,20 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.RunnableOp;
public class TcpOpDispenser extends BaseOpDispenser<Op,TcpAdapterSpace> {
public class TcpClientOp implements RunnableOp {
public TcpOpDispenser(TcpDriverAdapter adapter, ParsedOp op) {
super(adapter, op);
private final TcpClientAdapterSpace ctx;
private final String text;
public TcpClientOp(TcpClientAdapterSpace ctx, String text) {
this.ctx = ctx;
this.text = text;
}
@Override
public Op apply(long cycle) {
return new TcpOp(cycle);
public void run() {
ctx.writeflush(text);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClientAdapterSpace> {
private final LongFunction<TcpClientAdapterSpace> ctxFunction;
private final LongFunction<String> outFunction;
public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpClientAdapterSpace> ctxfunc) {
super(adapter,cmd);
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
this.outFunction = stringFunction;
}
@Override
public TcpClientOp apply(long value) {
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
String output = outFunction.apply(value);
return new TcpClientOp(ctx,output);
}
}

View File

@ -14,23 +14,32 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
package io.nosqlbench.adapter.tcpclient;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
public class TcpOpMapper implements OpMapper<Op> {
import java.util.function.LongFunction;
private final TcpDriverAdapter adapter;
public class TcpClientOpMapper implements OpMapper<TcpClientOp> {
public TcpOpMapper(TcpDriverAdapter adapter) {
private final DriverSpaceCache<? extends TcpClientAdapterSpace> ctxcache;
private final TcpClientDriverAdapter adapter;
public TcpClientOpMapper(TcpClientDriverAdapter adapter, DriverSpaceCache<? extends TcpClientAdapterSpace> ctxcache) {
this.ctxcache = ctxcache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp op) {
return new TcpOpDispenser(adapter,op);
public OpDispenser<TcpClientOp> apply(ParsedOp op) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpClientAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
return new TcpClientOpDispenser(adapter,op,ctxfunc);
}
}

View File

@ -0,0 +1,295 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.ServerSocket;
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
public class TcpServerAdapterSpace implements AutoCloseable{
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
private LinkedBlockingQueue<String> queue;
private ServerSocket listenerSocket;
private final List<Shutdown> managedShutdown = new ArrayList<>();
private int capacity=10;
public TcpServerAdapterSpace(NBConfiguration config) {
this.config = config;
this.writer = createPrintWriter();
}
private Writer createPrintWriter() {
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
this.capacity=config.getOptional(int.class, "capacity").orElse(10);
queue = new LinkedBlockingQueue<>(capacity);
ServerSocketFactory socketFactory;
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
socketFactory = SSLKsFactory.get().createSSLServerSocketFactory(sslCfg);
} else {
socketFactory = ServerSocketFactory.getDefault();
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(int.class, "port").orElse(12345);
if (listenerSocket == null || listenerSocket.isClosed()) {
try {
InetAddress hostAddr = InetAddress.getByName(host);
listenerSocket = socketFactory.createServerSocket(port, 10, hostAddr);
if (socketFactory instanceof SSLServerSocketFactory) {
logger.info("SSL enabled on server socket " + listenerSocket);
}
SocketAcceptor socketAcceptor = new SocketAcceptor(queue, listenerSocket);
managedShutdown.add(socketAcceptor);
Thread acceptorThread = new Thread(socketAcceptor);
acceptorThread.setDaemon(true);
acceptorThread.setName("Listener/" + listenerSocket);
acceptorThread.start();
} catch (IOException e) {
throw new RuntimeException("Error listening on listenerSocket:" + e, e);
}
}
QueueWriterAdapter queueWriterAdapter = new QueueWriterAdapter(this.queue);
logger.info("initialized queue writer:" + queueWriterAdapter);
return queueWriterAdapter;
}
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
while(this.queue != null && !this.queue.isEmpty())
{
try {
Thread.sleep(10);
}catch (InterruptedException e) {
}
}
logger.info("TcpServerAdapterSpace is being closed");
for (Shutdown toClose : managedShutdown) {
toClose.shutdown();
}
}
public void writeflush(String text) {
try {
if(this.writer == null)
{
this.writer = createPrintWriter();
}
this.writer.write(text);
this.writer.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(TcpServerAdapterSpace.class)
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("capacity",10)
.setDescription("the capacity of the queue")
)
.add(
Param.defaultTo("host","localhost")
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
)
.add(
Param.defaultTo("newline",true)
.setDescription("whether to automatically add a missing newline to the end of any output\n")
)
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
.setDescription("""
Which format to use.
"If provided, the format will override any statement formats provided by the YAML.
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("""
This is a simple way to specify a filter for the names of bindings that you want to use.
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
)
.asReadOnly();
}
private interface Shutdown {
void shutdown();
}
public static class SocketWriter implements Runnable, Shutdown {
private final BlockingQueue<String> sourceQueue;
private final Socket connectedSocket;
private boolean running = true;
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
this.sourceQueue = sourceQueue;
this.connectedSocket = connectedSocket;
}
public void shutdown() {
this.running = false;
}
@Override
public void run() {
OutputStream outputStream = null;
try {
outputStream = connectedSocket.getOutputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
try (Writer runWriter = new OutputStreamWriter(outputStream);) {
while (running ) {
if(!sourceQueue.isEmpty()) {
try {
//String data = sourceQueue.take();
String data = sourceQueue.poll(1, TimeUnit.SECONDS);
if(data == null) {
continue;
}
runWriter.write(data);
runWriter.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public static class QueueWriterAdapter extends Writer {
private BlockingQueue<String> queue;
private volatile boolean running = true;
public QueueWriterAdapter(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public synchronized void write( char[] cbuf, int off, int len) {
String message = new String(cbuf, off, len);
while (running) {
try {
if(queue.offer(message, 1, TimeUnit.SECONDS)) {
return;
}
} catch (InterruptedException ignored) {
logger.debug("QueueWriterAdapter was interrupted");
running =false;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public synchronized void flush() throws IOException {
}
@Override
public synchronized void close() throws IOException {
flush();
running =false;
queue = null;
}
}
public class SocketAcceptor implements Runnable, Shutdown {
private final BlockingQueue<String> queue;
private final ServerSocket serverSocket;
private boolean running = true;
public SocketAcceptor(BlockingQueue<String> queue, ServerSocket serverSocket) {
this.queue = queue;
this.serverSocket = serverSocket;
}
public void shutdown() {
this.running = false;
}
@Override
public void run() {
try (ServerSocket runServerSocket = this.serverSocket) {
while (running) {
runServerSocket.setSoTimeout(1000);
runServerSocket.setReuseAddress(true);
try {
Socket connectedSocket = runServerSocket.accept();
SocketWriter writer = new SocketWriter(queue, connectedSocket);
managedShutdown.add(writer);
Thread writerThread = new Thread(writer);
writerThread.setName("SocketWriter/" + connectedSocket);
writerThread.setDaemon(false);
writerThread.start();
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException e) {
logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.adapter.stdout.StdoutDriverAdapter;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
@Service(value= DriverAdapter.class, selector="tcpserver")
public class TcpServerDriverAdapter extends BaseDriverAdapter<TcpServerOp, TcpServerAdapterSpace> implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger(TcpServerDriverAdapter.class);
private final static StdoutDriverAdapter adap = new StdoutDriverAdapter();
@Override
public OpMapper<TcpServerOp> getOpMapper() {
DriverSpaceCache<? extends TcpServerAdapterSpace> ctxCache = getSpaceCache();
return new TcpServerOpMapper(this,ctxCache);
}
@Override
public Function<String, ? extends TcpServerAdapterSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new TcpServerAdapterSpace(cfg);
}
@Override
public NBConfigModel getConfigModel() {
return ConfigModel.of(this.getClass())
.add(super.getConfigModel())
.add(TcpServerAdapterSpace.getConfigModel());
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
return adap.getSyntheticOpTemplates(stmtsDocList, cfg);
}
}

View File

@ -14,11 +14,19 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.tcp;
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.RunnableOp;
public class TcpOp implements Op {
public TcpOp(long cycle) {
public class TcpServerOp implements RunnableOp {
private final TcpServerAdapterSpace ctx;
private final String text;
public TcpServerOp(TcpServerAdapterSpace ctx, String text) {
this.ctx = ctx;
this.text = text;
}
public void run() {
ctx.writeflush(text);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerAdapterSpace> {
private final LongFunction<TcpServerAdapterSpace> ctxFunction;
private final LongFunction<String> outFunction;
public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpServerAdapterSpace> ctxfunc) {
super(adapter,cmd);
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
this.outFunction = stringFunction;
}
@Override
public TcpServerOp apply(long value) {
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
String output = outFunction.apply(value);
return new TcpServerOp(ctx,output);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpMapper implements OpMapper<TcpServerOp> {
private final DriverSpaceCache<? extends TcpServerAdapterSpace> ctxcache;
private final TcpServerDriverAdapter adapter;
public TcpServerOpMapper(TcpServerDriverAdapter adapter, DriverSpaceCache<? extends TcpServerAdapterSpace> ctxcache) {
this.ctxcache = ctxcache;
this.adapter = adapter;
}
@Override
public OpDispenser<TcpServerOp> apply(ParsedOp op) {
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
LongFunction<TcpServerAdapterSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
return new TcpServerOpDispenser(adapter,op,ctxfunc);
}
}

View File

@ -5,27 +5,20 @@
The tcpclient driver is based on the behavior of the stdout driver. You configure the tcpclient driver in exactly the
same way as the stdout driver, except for the additional parameters shown here.
The tcpclient driver connects to the configured server address and port (a socket address). When connected, it sends any
buffered lines of data to the server.
If the buffer is primed with data when the client is connected to a server, it will send all of the data at once. After
this, data is added to the buffer at whatever cyclerate the activity is configured for. If you add data to the buffer
faster than you can send it to a connected server, you will have a number of failed operations.
However, the opposite is not true. You should generally ensure that you can send the data as fast as you provide it, and
the error counts give you a relatively easy way to verify this. If you wish to disable this behavior, set the retries to
a very high value. In this case, the tries metric will still give you some measure of internal buffer saturation.
The tcpclient driver connects to a configured host and port (a socket address). When a server is listening on that socket,
then the data for each cycle is written via the socket just like stdout would write.
## Examples
Run a stdout activity named 'stdout-test', with definitions from activities/stdout-test.yaml
... driver=tcpserver yaml=stdout-test
... driver=tcpclient yaml=stdout-test
## Driver Parameters
- **retry_delay** - The internal retry frequency at which the internal cycle loop will attempt to add data to the
buffer. This applies when the internal buffer is full and the client isn't able to send data from it.
buffer. This applies when the internal buffer is full and no clients are consuming data from it.
- unit: milliseconds
- default: 1000
- dynamic: false
@ -33,6 +26,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
failed.
- default: 3
- dynamic: false
- **ssl** - boolean to enable or disable ssl
- default: false
- dynamic: false
@ -41,10 +35,10 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
[Additional parameters may need to be provided](../../../../driver-cql/src/main/resources/ssl.md).
- **host** - this is the name to connect to (remote server IP address)
- **host** - this is the name to bind to (local interface address)
- default: localhost
- dynamic: false
- **port** - this is the name of the port to connect to (remote server port)
- **port** - this is the name of the port to listen on
- default: 12345
- dynamic: false
- **capacity** - the size of the internal blocking queue

View File

@ -9,6 +9,8 @@ The tcpserver driver listens on a configured host and port (a socket address). W
internal queue is buffered to them as long as there is data in it. For each cycle of data in the internal buffer, one of
the connected clients will get it in unspecified order.
The driver activity will block as long as there are still messages in the queue (Max:capacity). To ensure that the queue is empties and the activity shuts down correctly, one must make sure that a client connects to the server to receive the queued messages.
If the buffer is primed with data when a client is connected it will get all of the data at once. After this, data is
added to the buffer at whatever cyclerate the activity is configured for. If you add data to the buffer faster than you
can consume it with connected clients, you will have a number of failed operations.

View File

@ -1,3 +0,0 @@
# tcp help topics
- tcpclient
- tcpserver

View File

@ -160,10 +160,16 @@ public class NBConfiguration {
if (o==null) {
return Optional.empty();
}
if (type.isAssignableFrom(o.getClass())) {
if (type.isInstance(o)) {
return Optional.of((T) o);
} else if (type.isAssignableFrom(o.getClass())) {
return Optional.of((T)type.cast(o));
} else if (NBTypeConverter.canConvert(o, type)) {
return Optional.of((T) NBTypeConverter.convert(o, type));
} else {
throw new NBConfigError("config param " + Arrays.toString(names) +" was not assignable to class '" + type.getCanonicalName() + "'");
}
throw new NBConfigError("config param " + Arrays.toString(names) +" was not assignable to class '" + type.getCanonicalName() + "'");
}
public <T> T getOrDefault(String name, T defaultValue) {

View File

@ -118,6 +118,12 @@
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-tcp</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-kafka</artifactId>

View File

@ -30,7 +30,7 @@ public class MetricsIntegrationTest {
@Test
public void testHistogramLogger() {
ActivityDef ad = ActivityDef.parseActivityDef("alias=foo;driver=diag;op=noop");
Histogram testhistogram = ActivityMetrics.histogram(ad, "testhistogram", 4);
Histogram testhistogram = ActivityMetrics.histogram(ad, "testhistogram", 3);
ActivityMetrics.addHistoLogger("testsession", ".*","testhisto.log","1s");
testhistogram.update(400);
testhistogram.getSnapshot();