Revert "nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d"

This reverts commit 1b2655c272.
This commit is contained in:
Mike Yaacoub 2023-01-05 17:43:10 -05:00
parent 1b2655c272
commit 6881c0790a
7 changed files with 1 additions and 226 deletions

View File

@ -26,10 +26,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
<<<<<<< HEAD
=======
import java.io.FileNotFoundException;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
@ -37,16 +33,9 @@ import java.net.Socket;
public class TcpClientAdapterSpace {
<<<<<<< HEAD
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
=======
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
private PrintWriter console;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public TcpClientAdapterSpace(NBConfiguration config) {
this.config = config;
@ -63,11 +52,7 @@ public class TcpClientAdapterSpace {
}
String host = config.getOptional("host").orElse("localhost");
<<<<<<< HEAD
int port = config.getOptional(int.class, "port").orElse(12345);
=======
int port = config.getOptional(int.class, "port").orElse(8080);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
try {
Socket socket = socketFactory.createSocket(host, port);
@ -91,23 +76,11 @@ public class TcpClientAdapterSpace {
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("host","localhost")
<<<<<<< HEAD
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
=======
.setDescription("")
)
.add(
Param.defaultTo("port",8080)
.setDescription("")
)
.add(
Param.defaultTo("filename","tcpclient")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpclient, not a file.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.add(
Param.defaultTo("newline",true)
@ -116,7 +89,6 @@ public class TcpClientAdapterSpace {
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
<<<<<<< HEAD
.setDescription("""
Which format to use.
If provided, the format will override any statement formats provided by the YAML.
@ -129,18 +101,6 @@ public class TcpClientAdapterSpace {
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
=======
.setDescription("Which format to use.\n" +
"If provided, the format will override any statement formats provided by the YAML. " +
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
"here as well.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.asReadOnly();

View File

@ -33,10 +33,6 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
<<<<<<< HEAD
=======
import java.util.regex.Pattern;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
@Service(value= DriverAdapter.class,selector = "tcpclient")
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {

View File

@ -23,37 +23,21 @@ import java.util.function.LongFunction;
public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClientAdapterSpace> {
<<<<<<< HEAD
private final LongFunction<TcpClientAdapterSpace> ctxFunction;
=======
private final LongFunction<TcpClientAdapterSpace> ctxfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private final LongFunction<String> outFunction;
public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpClientAdapterSpace> ctxfunc) {
super(adapter,cmd);
<<<<<<< HEAD
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringFunction;
=======
this.ctxfunc = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
@Override
public TcpClientOp apply(long value) {
<<<<<<< HEAD
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
=======
TcpClientAdapterSpace ctx = ctxfunc.apply(value);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
String output = outFunction.apply(value);
return new TcpClientOp(ctx,output);
}

View File

@ -20,26 +20,13 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
<<<<<<< HEAD
=======
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.ServerSocket;
<<<<<<< HEAD
import java.io.IOException;
import java.io.Writer;
import java.net.InetAddress;
=======
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.ServerSocket;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.io.OutputStream;
@ -48,33 +35,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
<<<<<<< HEAD
import java.util.concurrent.TimeUnit;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
public class TcpServerAdapterSpace implements AutoCloseable{
<<<<<<< HEAD
=======
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
while(this.queue != null && !this.queue.isEmpty())
{
try {
Thread.sleep(10);
}catch (InterruptedException e) {
}
}
logger.info("TcpServerAdapterSpace is being closed");
for (Shutdown toClose : managedShutdown) {
toClose.shutdown();
}
}
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
@ -83,10 +49,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
private ServerSocket listenerSocket;
private final List<Shutdown> managedShutdown = new ArrayList<>();
private int capacity=10;
<<<<<<< HEAD
=======
private ServerSocketFactory socketFactory;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public TcpServerAdapterSpace(NBConfiguration config) {
this.config = config;
@ -98,11 +60,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
this.capacity=config.getOptional(int.class, "capacity").orElse(10);
queue = new LinkedBlockingQueue<>(capacity);
<<<<<<< HEAD
ServerSocketFactory socketFactory;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
@ -112,11 +70,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
String host = config.getOptional("host").orElse("localhost");
<<<<<<< HEAD
int port = config.getOptional(int.class, "port").orElse(12345);
=======
int port = config.getOptional(int.class, "port").orElse(8080);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
if (listenerSocket == null || listenerSocket.isClosed()) {
try {
@ -141,7 +95,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
return queueWriterAdapter;
}
<<<<<<< HEAD
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
@ -158,8 +111,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
}
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public void writeflush(String text) {
try {
if(this.writer == null)
@ -177,7 +128,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("capacity",10)
<<<<<<< HEAD
.setDescription("the capacity of the queue")
)
.add(
@ -187,21 +137,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
=======
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
)
.add(
Param.defaultTo("host","localhost")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
)
.add(
Param.defaultTo("port",8080)
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
)
.add(
Param.defaultTo("filename","tcpserver")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.add(
Param.defaultTo("newline",true)
@ -210,7 +145,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
<<<<<<< HEAD
.setDescription("""
Which format to use.
"If provided, the format will override any statement formats provided by the YAML.
@ -223,18 +157,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
=======
.setDescription("Which format to use.\n" +
"If provided, the format will override any statement formats provided by the YAML. " +
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
"here as well.")
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
)
.asReadOnly();
@ -244,28 +166,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
public static class SocketWriter implements Runnable, Shutdown {
private final BlockingQueue<String> sourceQueue;
<<<<<<< HEAD
private final Socket connectedSocket;
=======
private final OutputStream outputStream;
private final OutputStreamWriter writer;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private boolean running = true;
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
this.sourceQueue = sourceQueue;
<<<<<<< HEAD
this.connectedSocket = connectedSocket;
=======
try {
outputStream = connectedSocket.getOutputStream();
this.writer = new OutputStreamWriter(outputStream);
//connectedSocket.shutdownInput();
} catch (Exception e) {
throw new RuntimeException(e);
}
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
public void shutdown() {
@ -274,7 +181,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void run() {
<<<<<<< HEAD
OutputStream outputStream = null;
try {
outputStream = connectedSocket.getOutputStream();
@ -292,15 +198,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
runWriter.write(data);
runWriter.flush();
=======
try (Writer writer = this.writer) {
while (running) {
while (!sourceQueue.isEmpty() ) {
try {
String data = sourceQueue.take();
writer.write(data);
writer.flush();
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -313,12 +210,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
} catch (Exception e) {
throw new RuntimeException(e);
}
<<<<<<< HEAD
=======
finally {
//TODO: Add the shutdown logic to do after closing the socket
}
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
@ -327,18 +218,14 @@ public class TcpServerAdapterSpace implements AutoCloseable{
public static class QueueWriterAdapter extends Writer {
private BlockingQueue<String> queue;
<<<<<<< HEAD
private volatile boolean running = true;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
public QueueWriterAdapter(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public synchronized void write( char[] cbuf, int off, int len) {
<<<<<<< HEAD
String message = new String(cbuf, off, len);
while (running) {
try {
@ -348,13 +235,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
} catch (InterruptedException ignored) {
logger.debug("QueueWriterAdapter was interrupted");
running =false;
=======
while (true) {
try {
queue.put(new String(cbuf, off, len));
return;
} catch (InterruptedException ignored) {
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -368,10 +248,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public synchronized void close() throws IOException {
flush();
<<<<<<< HEAD
running =false;
=======
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
queue = null;
}
@ -393,7 +270,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void run() {
<<<<<<< HEAD
try (ServerSocket runServerSocket = this.serverSocket) {
while (running) {
runServerSocket.setSoTimeout(1000);
@ -409,22 +285,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException e) {
logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server");
=======
try (ServerSocket serverSocket = this.serverSocket) {
while (running) {
serverSocket.setSoTimeout(1000);
serverSocket.setReuseAddress(true);
try {
Socket connectedSocket = serverSocket.accept();
SocketWriter writer = new SocketWriter(queue, connectedSocket);
//TCPServerActivity.this.managedShutdown.add(writer);
Thread writerThread = new Thread(writer);
writerThread.setName("SocketWriter/" + connectedSocket);
writerThread.setDaemon(true);
writerThread.start();
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException ignored) {
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
}
} catch (IOException e) {

View File

@ -17,48 +17,27 @@
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
<<<<<<< HEAD
=======
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerAdapterSpace> {
<<<<<<< HEAD
private final LongFunction<TcpServerAdapterSpace> ctxFunction;
=======
private final LongFunction<TcpServerAdapterSpace> ctxfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
private final LongFunction<String> outFunction;
public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpServerAdapterSpace> ctxfunc) {
super(adapter,cmd);
<<<<<<< HEAD
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringFunction;
=======
this.ctxfunc = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringfunc;
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
}
@Override
public TcpServerOp apply(long value) {
<<<<<<< HEAD
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
=======
TcpServerAdapterSpace ctx = ctxfunc.apply(value);
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
String output = outFunction.apply(value);
return new TcpServerOp(ctx,output);
}

View File

@ -39,11 +39,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
- default: localhost
- dynamic: false
- **port** - this is the name of the port to listen on
<<<<<<< HEAD
- default: 12345
=======
- default: 8080
>>>>>>> ef397c742 (nosqlbench-440 convert tcpclient driver from ActivityType to DriverAdapter e:1d)
- dynamic: false
- **capacity** - the size of the internal blocking queue
- default: 10

View File

@ -51,7 +51,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
- default: localhost
- dynamic: false
- **port** - this is the name of the port to listen on
- default: 8080
- default: 12345
- dynamic: false
- **capacity** - the size of the internal blocking queue
- default: 10