nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d

This commit is contained in:
Mike Yaacoub 2023-01-04 15:24:56 -05:00
parent 376a9b04aa
commit 1b2655c272
7 changed files with 226 additions and 1 deletions

View File

@ -26,6 +26,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
<<<<<<< HEAD
=======
import java.io.FileNotFoundException;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
@ -33,9 +37,16 @@ import java.net.Socket;
public class TcpClientAdapterSpace {
<<<<<<< HEAD
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
=======
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
private PrintWriter console;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public TcpClientAdapterSpace(NBConfiguration config) {
this.config = config;
@ -52,7 +63,11 @@ public class TcpClientAdapterSpace {
}
String host = config.getOptional("host").orElse("localhost");
<<<<<<< HEAD
int port = config.getOptional(int.class, "port").orElse(12345);
=======
int port = config.getOptional(int.class, "port").orElse(8080);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
try {
Socket socket = socketFactory.createSocket(host, port);
@ -76,11 +91,23 @@ public class TcpClientAdapterSpace {
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("host","localhost")
<<<<<<< HEAD
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
=======
.setDescription("")
)
.add(
Param.defaultTo("port",8080)
.setDescription("")
)
.add(
Param.defaultTo("filename","tcpclient")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpclient, not a file.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.add(
Param.defaultTo("newline",true)
@ -89,6 +116,7 @@ public class TcpClientAdapterSpace {
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
<<<<<<< HEAD
.setDescription("""
Which format to use.
If provided, the format will override any statement formats provided by the YAML.
@ -101,6 +129,18 @@ public class TcpClientAdapterSpace {
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
=======
.setDescription("Which format to use.\n" +
"If provided, the format will override any statement formats provided by the YAML. " +
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
"here as well.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.asReadOnly();

View File

@ -33,6 +33,10 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
<<<<<<< HEAD
=======
import java.util.regex.Pattern;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
@Service(value= DriverAdapter.class,selector = "tcpclient")
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {

View File

@ -23,21 +23,37 @@ import java.util.function.LongFunction;
public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClientAdapterSpace> {
<<<<<<< HEAD
private final LongFunction<TcpClientAdapterSpace> ctxFunction;
=======
private final LongFunction<TcpClientAdapterSpace> ctxfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private final LongFunction<String> outFunction;
public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpClientAdapterSpace> ctxfunc) {
super(adapter,cmd);
<<<<<<< HEAD
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringFunction;
=======
this.ctxfunc = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
@Override
public TcpClientOp apply(long value) {
<<<<<<< HEAD
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
=======
TcpClientAdapterSpace ctx = ctxfunc.apply(value);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
String output = outFunction.apply(value);
return new TcpClientOp(ctx,output);
}

View File

@ -20,13 +20,26 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
<<<<<<< HEAD
=======
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.ServerSocket;
<<<<<<< HEAD
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
=======
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.ServerSocket;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.io.OutputStream;
@ -35,12 +48,33 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
<<<<<<< HEAD
import java.util.concurrent.TimeUnit;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
public class TcpServerAdapterSpace implements AutoCloseable{
<<<<<<< HEAD
=======
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
while(this.queue != null && !this.queue.isEmpty())
{
try {
Thread.sleep(10);
}catch (InterruptedException e) {
}
}
logger.info("TcpServerAdapterSpace is being closed");
for (Shutdown toClose : managedShutdown) {
toClose.shutdown();
}
}
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
@ -49,6 +83,10 @@ public class TcpServerAdapterSpace implements AutoCloseable{
private ServerSocket listenerSocket;
private final List<Shutdown> managedShutdown = new ArrayList<>();
private int capacity=10;
<<<<<<< HEAD
=======
private ServerSocketFactory socketFactory;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public TcpServerAdapterSpace(NBConfiguration config) {
this.config = config;
@ -60,7 +98,11 @@ public class TcpServerAdapterSpace implements AutoCloseable{
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
this.capacity=config.getOptional(int.class, "capacity").orElse(10);
queue = new LinkedBlockingQueue<>(capacity);
<<<<<<< HEAD
ServerSocketFactory socketFactory;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
@ -70,7 +112,11 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
String host = config.getOptional("host").orElse("localhost");
<<<<<<< HEAD
int port = config.getOptional(int.class, "port").orElse(12345);
=======
int port = config.getOptional(int.class, "port").orElse(8080);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
if (listenerSocket == null || listenerSocket.isClosed()) {
try {
@ -95,6 +141,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
return queueWriterAdapter;
}
<<<<<<< HEAD
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
@ -111,6 +158,8 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
}
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public void writeflush(String text) {
try {
if(this.writer == null)
@ -128,6 +177,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("capacity",10)
<<<<<<< HEAD
.setDescription("the capacity of the queue")
)
.add(
@ -137,6 +187,21 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
=======
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
)
.add(
Param.defaultTo("host","localhost")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
)
.add(
Param.defaultTo("port",8080)
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
)
.add(
Param.defaultTo("filename","tcpserver")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.add(
Param.defaultTo("newline",true)
@ -145,6 +210,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
<<<<<<< HEAD
.setDescription("""
Which format to use.
"If provided, the format will override any statement formats provided by the YAML.
@ -157,6 +223,18 @@ public class TcpServerAdapterSpace implements AutoCloseable{
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
=======
.setDescription("Which format to use.\n" +
"If provided, the format will override any statement formats provided by the YAML. " +
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
"here as well.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.asReadOnly();
@ -166,13 +244,28 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
public static class SocketWriter implements Runnable, Shutdown {
private final BlockingQueue<String> sourceQueue;
<<<<<<< HEAD
private final Socket connectedSocket;
=======
private final OutputStream outputStream;
private final OutputStreamWriter writer;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private boolean running = true;
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
this.sourceQueue = sourceQueue;
<<<<<<< HEAD
this.connectedSocket = connectedSocket;
=======
try {
outputStream = connectedSocket.getOutputStream();
this.writer = new OutputStreamWriter(outputStream);
//connectedSocket.shutdownInput();
} catch (Exception e) {
throw new RuntimeException(e);
}
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
public void shutdown() {
@ -181,6 +274,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void run() {
<<<<<<< HEAD
OutputStream outputStream = null;
try {
outputStream = connectedSocket.getOutputStream();
@ -198,6 +292,15 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
runWriter.write(data);
runWriter.flush();
=======
try (Writer writer = this.writer) {
while (running) {
while (!sourceQueue.isEmpty() ) {
try {
String data = sourceQueue.take();
writer.write(data);
writer.flush();
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -210,6 +313,12 @@ public class TcpServerAdapterSpace implements AutoCloseable{
} catch (Exception e) {
throw new RuntimeException(e);
}
<<<<<<< HEAD
=======
finally {
//TODO: Add the shutdown logic to do after closing the socket
}
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
@ -218,14 +327,18 @@ public class TcpServerAdapterSpace implements AutoCloseable{
public static class QueueWriterAdapter extends Writer {
private BlockingQueue<String> queue;
<<<<<<< HEAD
private volatile boolean running = true;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public QueueWriterAdapter(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public synchronized void write( char[] cbuf, int off, int len) {
<<<<<<< HEAD
String message = new String(cbuf, off, len);
while (running) {
try {
@ -235,6 +348,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{
} catch (InterruptedException ignored) {
logger.debug("QueueWriterAdapter was interrupted");
running =false;
=======
while (true) {
try {
queue.put(new String(cbuf, off, len));
return;
} catch (InterruptedException ignored) {
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -248,7 +368,10 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public synchronized void close() throws IOException {
flush();
<<<<<<< HEAD
running =false;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
queue = null;
}
@ -270,6 +393,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void run() {
<<<<<<< HEAD
try (ServerSocket runServerSocket = this.serverSocket) {
while (running) {
runServerSocket.setSoTimeout(1000);
@ -285,6 +409,22 @@ public class TcpServerAdapterSpace implements AutoCloseable{
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException e) {
logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server");
=======
try (ServerSocket serverSocket = this.serverSocket) {
while (running) {
serverSocket.setSoTimeout(1000);
serverSocket.setReuseAddress(true);
try {
Socket connectedSocket = serverSocket.accept();
SocketWriter writer = new SocketWriter(queue, connectedSocket);
//TCPServerActivity.this.managedShutdown.add(writer);
Thread writerThread = new Thread(writer);
writerThread.setName("SocketWriter/" + connectedSocket);
writerThread.setDaemon(true);
writerThread.start();
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException ignored) {
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
}
} catch (IOException e) {

View File

@ -17,27 +17,48 @@
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
<<<<<<< HEAD
=======
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerAdapterSpace> {
<<<<<<< HEAD
private final LongFunction<TcpServerAdapterSpace> ctxFunction;
=======
private final LongFunction<TcpServerAdapterSpace> ctxfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private final LongFunction<String> outFunction;
public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpServerAdapterSpace> ctxfunc) {
super(adapter,cmd);
<<<<<<< HEAD
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringFunction;
=======
this.ctxfunc = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
@Override
public TcpServerOp apply(long value) {
<<<<<<< HEAD
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
=======
TcpServerAdapterSpace ctx = ctxfunc.apply(value);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
String output = outFunction.apply(value);
return new TcpServerOp(ctx,output);
}

View File

@ -39,7 +39,11 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
- default: localhost
- dynamic: false
- **port** - this is the name of the port to listen on
<<<<<<< HEAD
- default: 12345
=======
- default: 8080
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
- dynamic: false
- **capacity** - the size of the internal blocking queue
- default: 10

View File

@ -51,7 +51,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
- default: localhost
- dynamic: false
- **port** - this is the name of the port to listen on
- default: 12345
- default: 8080
- dynamic: false
- **capacity** - the size of the internal blocking queue
- default: 10