Cleaned up code after review.

This commit is contained in:
Mike Yaacoub
2023-01-05 10:37:00 -05:00
parent bdf61845fd
commit ccc7f7ab3f
7 changed files with 75 additions and 74 deletions

View File

@@ -26,7 +26,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.net.SocketFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
@@ -34,10 +33,9 @@ import java.net.Socket;
public class TcpClientAdapterSpace {
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
private final NBConfiguration config;
Writer writer;
private PrintWriter console;
public TcpClientAdapterSpace(NBConfiguration config) {
this.config = config;
@@ -54,7 +52,7 @@ public class TcpClientAdapterSpace {
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(int.class, "port").orElse(8080);
int port = config.getOptional(int.class, "port").orElse(12345);
try {
Socket socket = socketFactory.createSocket(host, port);
@@ -78,11 +76,11 @@ public class TcpClientAdapterSpace {
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("host","localhost")
.setDescription("")
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",8080)
.setDescription("")
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
)
.add(
Param.defaultTo("filename","tcpclient")
@@ -95,16 +93,18 @@ public class TcpClientAdapterSpace {
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
.setDescription("Which format to use.\n" +
"If provided, the format will override any statement formats provided by the YAML. " +
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
.setDescription("""
Which format to use.
If provided, the format will override any statement formats provided by the YAML.
If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
"here as well.")
.setDescription("""
This is a simple way to specify a filter for the names of bindings that you want to use.
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
)
.asReadOnly();

View File

@@ -33,7 +33,6 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
@Service(value= DriverAdapter.class,selector = "tcpclient")
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {

View File

@@ -23,21 +23,21 @@ import java.util.function.LongFunction;
public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClientAdapterSpace> {
private final LongFunction<TcpClientAdapterSpace> ctxfunc;
private final LongFunction<TcpClientAdapterSpace> ctxFunction;
private final LongFunction<String> outFunction;
public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpClientAdapterSpace> ctxfunc) {
super(adapter,cmd);
this.ctxfunc = ctxfunc;
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringfunc;
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringFunction;
}
@Override
public TcpClientOp apply(long value) {
TcpClientAdapterSpace ctx = ctxfunc.apply(value);
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
String output = outFunction.apply(value);
return new TcpClientOp(ctx,output);
}

View File

@@ -43,21 +43,7 @@ import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
while(this.queue != null && !this.queue.isEmpty())
{
try {
Thread.sleep(10);
}catch (InterruptedException e) {
}
}
logger.info("TcpServerAdapterSpace is being closed");
for (Shutdown toClose : managedShutdown) {
toClose.shutdown();
}
}
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
private final NBConfiguration config;
@@ -66,7 +52,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
private ServerSocket listenerSocket;
private final List<Shutdown> managedShutdown = new ArrayList<>();
private int capacity=10;
private ServerSocketFactory socketFactory;
public TcpServerAdapterSpace(NBConfiguration config) {
this.config = config;
@@ -78,7 +63,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
this.capacity=config.getOptional(int.class, "capacity").orElse(10);
queue = new LinkedBlockingQueue<>(capacity);
ServerSocketFactory socketFactory;
if (sslEnabled) {
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
@@ -88,7 +73,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
String host = config.getOptional("host").orElse("localhost");
int port = config.getOptional(int.class, "port").orElse(8080);
int port = config.getOptional(int.class, "port").orElse(12345);
if (listenerSocket == null || listenerSocket.isClosed()) {
try {
@@ -113,6 +98,22 @@ public class TcpServerAdapterSpace implements AutoCloseable{
return queueWriterAdapter;
}
@Override
public void close() throws Exception {
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
while(this.queue != null && !this.queue.isEmpty())
{
try {
Thread.sleep(10);
}catch (InterruptedException e) {
}
}
logger.info("TcpServerAdapterSpace is being closed");
for (Shutdown toClose : managedShutdown) {
toClose.shutdown();
}
}
public void writeflush(String text) {
try {
if(this.writer == null)
@@ -130,15 +131,15 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(SSLKsFactory.get().getConfigModel())
.add(
Param.defaultTo("capacity",10)
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
.setDescription("the capacity of the queue")
)
.add(
Param.defaultTo("host","localhost")
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
.setDescription("the host address to use")
)
.add(
Param.defaultTo("port",8080)
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
Param.defaultTo("port",12345)
.setDescription("the designated port to connect to on the socket")
)
.add(
Param.defaultTo("filename","tcpserver")
@@ -151,16 +152,18 @@ public class TcpServerAdapterSpace implements AutoCloseable{
.add(
Param.optional("format")
.setRegex("csv|readout|json|inlinejson|assignments|diag")
.setDescription("Which format to use.\n" +
"If provided, the format will override any statement formats provided by the YAML. " +
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
.setDescription("""
Which format to use.
"If provided, the format will override any statement formats provided by the YAML.
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
)
.add(
Param.defaultTo("bindings","doc")
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
"here as well.")
.setDescription("""
This is a simple way to specify a filter for the names of bindings that you want to use.
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
"here as well.""")
)
.asReadOnly();
@@ -170,17 +173,16 @@ public class TcpServerAdapterSpace implements AutoCloseable{
}
public static class SocketWriter implements Runnable, Shutdown {
private final BlockingQueue<String> sourceQueue;
private final OutputStream outputStream;
private final OutputStreamWriter writer;
private final OutputStreamWriter outWriter;
private boolean running = true;
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
this.sourceQueue = sourceQueue;
try {
outputStream = connectedSocket.getOutputStream();
this.writer = new OutputStreamWriter(outputStream);
//connectedSocket.shutdownInput();
OutputStream outputStream = connectedSocket.getOutputStream();
this.outWriter = new OutputStreamWriter(outputStream);
connectedSocket.shutdownInput();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -192,13 +194,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void run() {
try (Writer writer = this.writer) {
try (Writer runWriter = this.outWriter) {
while (running) {
while (!sourceQueue.isEmpty() ) {
try {
String data = sourceQueue.take();
writer.write(data);
writer.flush();
runWriter.write(data);
runWriter.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -241,6 +243,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public synchronized void flush() throws IOException {
}
@Override
@@ -267,20 +270,21 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override
public void run() {
try (ServerSocket serverSocket = this.serverSocket) {
try (ServerSocket runServerSocket = this.serverSocket) {
while (running) {
serverSocket.setSoTimeout(1000);
serverSocket.setReuseAddress(true);
runServerSocket.setSoTimeout(1000);
runServerSocket.setReuseAddress(true);
try {
Socket connectedSocket = serverSocket.accept();
Socket connectedSocket = runServerSocket.accept();
SocketWriter writer = new SocketWriter(queue, connectedSocket);
//TCPServerActivity.this.managedShutdown.add(writer);
managedShutdown.add(writer);
Thread writerThread = new Thread(writer);
writerThread.setName("SocketWriter/" + connectedSocket);
writerThread.setDaemon(true);
writerThread.start();
logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException ignored) {
} catch (SocketTimeoutException e) {
logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server");
}
}
} catch (IOException e) {

View File

@@ -17,29 +17,27 @@
package io.nosqlbench.adapter.tcpserver;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerAdapterSpace> {
private final LongFunction<TcpServerAdapterSpace> ctxfunc;
private final LongFunction<TcpServerAdapterSpace> ctxFunction;
private final LongFunction<String> outFunction;
public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpServerAdapterSpace> ctxfunc) {
super(adapter,cmd);
this.ctxfunc = ctxfunc;
this.ctxFunction = ctxfunc;
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringfunc;
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
this.outFunction = stringFunction;
}
@Override
public TcpServerOp apply(long value) {
TcpServerAdapterSpace ctx = ctxfunc.apply(value);
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
String output = outFunction.apply(value);
return new TcpServerOp(ctx,output);
}

View File

@@ -39,7 +39,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
- default: localhost
- dynamic: false
- **port** - this is the name of the port to listen on
- default: 8080
- default: 12345
- dynamic: false
- **capacity** - the size of the internal blocking queue
- default: 10

View File

@@ -51,7 +51,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
- default: localhost
- dynamic: false
- **port** - this is the name of the port to listen on
- default: 8080
- default: 12345
- dynamic: false
- **capacity** - the size of the internal blocking queue
- default: 10