From ccc7f7ab3ff144fc294f13807142058cfd0cb58e Mon Sep 17 00:00:00 2001 From: Mike Yaacoub Date: Thu, 5 Jan 2023 10:37:00 -0500 Subject: [PATCH] Cleaned up code after review. --- .../tcpclient/TcpClientAdapterSpace.java | 28 +++--- .../tcpclient/TcpClientDriverAdapter.java | 1 - .../tcpclient/TcpClientOpDispenser.java | 12 +-- .../tcpserver/TcpServerAdapterSpace.java | 90 ++++++++++--------- .../tcpserver/TcpServerOpDispenser.java | 14 ++- adapter-tcp/src/main/resources/tcpclient.md | 2 +- adapter-tcp/src/main/resources/tcpserver.md | 2 +- 7 files changed, 75 insertions(+), 74 deletions(-) diff --git a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientAdapterSpace.java b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientAdapterSpace.java index d0a5e0896..992552b61 100644 --- a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientAdapterSpace.java +++ b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientAdapterSpace.java @@ -26,7 +26,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import javax.net.SocketFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; @@ -34,10 +33,9 @@ import java.net.Socket; public class TcpClientAdapterSpace { - private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class); + private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class); private final NBConfiguration config; Writer writer; - private PrintWriter console; public TcpClientAdapterSpace(NBConfiguration config) { this.config = config; @@ -54,7 +52,7 @@ public class TcpClientAdapterSpace { } String host = config.getOptional("host").orElse("localhost"); - int port = config.getOptional(int.class, "port").orElse(8080); + int port = config.getOptional(int.class, "port").orElse(12345); try { Socket socket = socketFactory.createSocket(host, port); @@ -78,11 +76,11 @@ public class TcpClientAdapterSpace { .add(SSLKsFactory.get().getConfigModel()) .add( Param.defaultTo("host","localhost") - .setDescription("") + .setDescription("the host address to use") ) .add( - Param.defaultTo("port",8080) - .setDescription("") + Param.defaultTo("port",12345) + .setDescription("the designated port to connect to on the socket") ) .add( Param.defaultTo("filename","tcpclient") @@ -95,16 +93,18 @@ public class TcpClientAdapterSpace { .add( Param.optional("format") .setRegex("csv|readout|json|inlinejson|assignments|diag") - .setDescription("Which format to use.\n" + - "If provided, the format will override any statement formats provided by the YAML. " + - "If 'diag' is used, a diagnostic readout will be provided for binding constructions.") + .setDescription(""" + Which format to use. + If provided, the format will override any statement formats provided by the YAML. + If 'diag' is used, a diagnostic readout will be provided for binding constructions.""") ) .add( Param.defaultTo("bindings","doc") - .setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" + - "If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" + - "as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" + - "here as well.") + .setDescription(""" + This is a simple way to specify a filter for the names of bindings that you want to use. + "If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken + "as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding + "here as well.""") ) .asReadOnly(); diff --git a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientDriverAdapter.java b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientDriverAdapter.java index 520e5ff35..e956d8a0c 100644 --- a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientDriverAdapter.java +++ b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientDriverAdapter.java @@ -33,7 +33,6 @@ import org.apache.logging.log4j.Logger; import java.util.*; import java.util.function.Function; -import java.util.regex.Pattern; @Service(value= DriverAdapter.class,selector = "tcpclient") public class TcpClientDriverAdapter extends BaseDriverAdapter implements SyntheticOpTemplateProvider { diff --git a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientOpDispenser.java b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientOpDispenser.java index fd5a252f0..0338f6104 100644 --- a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientOpDispenser.java +++ b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpclient/TcpClientOpDispenser.java @@ -23,21 +23,21 @@ import java.util.function.LongFunction; public class TcpClientOpDispenser extends BaseOpDispenser { - private final LongFunction ctxfunc; + private final LongFunction ctxFunction; private final LongFunction outFunction; public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction ctxfunc) { super(adapter,cmd); - this.ctxfunc = ctxfunc; + this.ctxFunction = ctxfunc; LongFunction objectFunction = cmd.getAsRequiredFunction("stmt", Object.class); - LongFunction stringfunc = l -> objectFunction.apply(l).toString(); - cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b); - this.outFunction = stringfunc; + LongFunction stringFunction = l -> objectFunction.apply(l).toString(); + cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b); + this.outFunction = stringFunction; } @Override public TcpClientOp apply(long value) { - TcpClientAdapterSpace ctx = ctxfunc.apply(value); + TcpClientAdapterSpace ctx = ctxFunction.apply(value); String output = outFunction.apply(value); return new TcpClientOp(ctx,output); } diff --git a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java index e94e37fed..1ca4c9601 100644 --- a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java +++ b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java @@ -43,21 +43,7 @@ import javax.net.ServerSocketFactory; import javax.net.ssl.SSLServerSocketFactory; public class TcpServerAdapterSpace implements AutoCloseable{ - @Override - public void close() throws Exception { - logger.info("TcpServerAdapterSpace is waiting for message queue to empty"); - while(this.queue != null && !this.queue.isEmpty()) - { - try { - Thread.sleep(10); - }catch (InterruptedException e) { - } - } - logger.info("TcpServerAdapterSpace is being closed"); - for (Shutdown toClose : managedShutdown) { - toClose.shutdown(); - } - } + private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class); private final NBConfiguration config; @@ -66,7 +52,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{ private ServerSocket listenerSocket; private final List managedShutdown = new ArrayList<>(); private int capacity=10; - private ServerSocketFactory socketFactory; public TcpServerAdapterSpace(NBConfiguration config) { this.config = config; @@ -78,7 +63,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{ boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false); this.capacity=config.getOptional(int.class, "capacity").orElse(10); queue = new LinkedBlockingQueue<>(capacity); - + ServerSocketFactory socketFactory; if (sslEnabled) { NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config); @@ -88,7 +73,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{ } String host = config.getOptional("host").orElse("localhost"); - int port = config.getOptional(int.class, "port").orElse(8080); + int port = config.getOptional(int.class, "port").orElse(12345); if (listenerSocket == null || listenerSocket.isClosed()) { try { @@ -113,6 +98,22 @@ public class TcpServerAdapterSpace implements AutoCloseable{ return queueWriterAdapter; } + @Override + public void close() throws Exception { + logger.info("TcpServerAdapterSpace is waiting for message queue to empty"); + while(this.queue != null && !this.queue.isEmpty()) + { + try { + Thread.sleep(10); + }catch (InterruptedException e) { + } + } + logger.info("TcpServerAdapterSpace is being closed"); + for (Shutdown toClose : managedShutdown) { + toClose.shutdown(); + } + } + public void writeflush(String text) { try { if(this.writer == null) @@ -130,15 +131,15 @@ public class TcpServerAdapterSpace implements AutoCloseable{ .add(SSLKsFactory.get().getConfigModel()) .add( Param.defaultTo("capacity",10) - .setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.") + .setDescription("the capacity of the queue") ) .add( Param.defaultTo("host","localhost") - .setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.") + .setDescription("the host address to use") ) .add( - Param.defaultTo("port",8080) - .setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.") + Param.defaultTo("port",12345) + .setDescription("the designated port to connect to on the socket") ) .add( Param.defaultTo("filename","tcpserver") @@ -151,16 +152,18 @@ public class TcpServerAdapterSpace implements AutoCloseable{ .add( Param.optional("format") .setRegex("csv|readout|json|inlinejson|assignments|diag") - .setDescription("Which format to use.\n" + - "If provided, the format will override any statement formats provided by the YAML. " + - "If 'diag' is used, a diagnostic readout will be provided for binding constructions.") + .setDescription(""" + Which format to use. + "If provided, the format will override any statement formats provided by the YAML. + "If 'diag' is used, a diagnostic readout will be provided for binding constructions.""") ) .add( Param.defaultTo("bindings","doc") - .setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" + - "If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" + - "as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" + - "here as well.") + .setDescription(""" + This is a simple way to specify a filter for the names of bindings that you want to use. + "If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken + "as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding + "here as well.""") ) .asReadOnly(); @@ -170,17 +173,16 @@ public class TcpServerAdapterSpace implements AutoCloseable{ } public static class SocketWriter implements Runnable, Shutdown { private final BlockingQueue sourceQueue; - private final OutputStream outputStream; - private final OutputStreamWriter writer; + private final OutputStreamWriter outWriter; private boolean running = true; public SocketWriter(BlockingQueue sourceQueue, Socket connectedSocket) { this.sourceQueue = sourceQueue; try { - outputStream = connectedSocket.getOutputStream(); - this.writer = new OutputStreamWriter(outputStream); - //connectedSocket.shutdownInput(); + OutputStream outputStream = connectedSocket.getOutputStream(); + this.outWriter = new OutputStreamWriter(outputStream); + connectedSocket.shutdownInput(); } catch (Exception e) { throw new RuntimeException(e); } @@ -192,13 +194,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{ @Override public void run() { - try (Writer writer = this.writer) { + try (Writer runWriter = this.outWriter) { while (running) { while (!sourceQueue.isEmpty() ) { try { String data = sourceQueue.take(); - writer.write(data); - writer.flush(); + runWriter.write(data); + runWriter.flush(); } catch (Exception e) { throw new RuntimeException(e); } @@ -241,6 +243,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{ @Override public synchronized void flush() throws IOException { + } @Override @@ -267,20 +270,21 @@ public class TcpServerAdapterSpace implements AutoCloseable{ @Override public void run() { - try (ServerSocket serverSocket = this.serverSocket) { + try (ServerSocket runServerSocket = this.serverSocket) { while (running) { - serverSocket.setSoTimeout(1000); - serverSocket.setReuseAddress(true); + runServerSocket.setSoTimeout(1000); + runServerSocket.setReuseAddress(true); try { - Socket connectedSocket = serverSocket.accept(); + Socket connectedSocket = runServerSocket.accept(); SocketWriter writer = new SocketWriter(queue, connectedSocket); - //TCPServerActivity.this.managedShutdown.add(writer); + managedShutdown.add(writer); Thread writerThread = new Thread(writer); writerThread.setName("SocketWriter/" + connectedSocket); writerThread.setDaemon(true); writerThread.start(); logger.info("Started writer thread for " + connectedSocket); - } catch (SocketTimeoutException ignored) { + } catch (SocketTimeoutException e) { + logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server"); } } } catch (IOException e) { diff --git a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerOpDispenser.java b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerOpDispenser.java index f5bbc57cc..e32967c0b 100644 --- a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerOpDispenser.java +++ b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerOpDispenser.java @@ -17,29 +17,27 @@ package io.nosqlbench.adapter.tcpserver; import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; -import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; -import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op; import io.nosqlbench.engine.api.templating.ParsedOp; import java.util.function.LongFunction; public class TcpServerOpDispenser extends BaseOpDispenser { - private final LongFunction ctxfunc; + private final LongFunction ctxFunction; private final LongFunction outFunction; public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction ctxfunc) { super(adapter,cmd); - this.ctxfunc = ctxfunc; + this.ctxFunction = ctxfunc; LongFunction objectFunction = cmd.getAsRequiredFunction("stmt", Object.class); - LongFunction stringfunc = l -> objectFunction.apply(l).toString(); - cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b); - this.outFunction = stringfunc; + LongFunction stringFunction = l -> objectFunction.apply(l).toString(); + cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b); + this.outFunction = stringFunction; } @Override public TcpServerOp apply(long value) { - TcpServerAdapterSpace ctx = ctxfunc.apply(value); + TcpServerAdapterSpace ctx = ctxFunction.apply(value); String output = outFunction.apply(value); return new TcpServerOp(ctx,output); } diff --git a/adapter-tcp/src/main/resources/tcpclient.md b/adapter-tcp/src/main/resources/tcpclient.md index dbea6328f..f0eb7777b 100644 --- a/adapter-tcp/src/main/resources/tcpclient.md +++ b/adapter-tcp/src/main/resources/tcpclient.md @@ -39,7 +39,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo - default: localhost - dynamic: false - **port** - this is the name of the port to listen on - - default: 8080 + - default: 12345 - dynamic: false - **capacity** - the size of the internal blocking queue - default: 10 diff --git a/adapter-tcp/src/main/resources/tcpserver.md b/adapter-tcp/src/main/resources/tcpserver.md index 6e8ea132c..80699599d 100644 --- a/adapter-tcp/src/main/resources/tcpserver.md +++ b/adapter-tcp/src/main/resources/tcpserver.md @@ -51,7 +51,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo - default: localhost - dynamic: false - **port** - this is the name of the port to listen on - - default: 8080 + - default: 12345 - dynamic: false - **capacity** - the size of the internal blocking queue - default: 10