From 6146d186d911ce5344f93a225c700c595b21d2dc Mon Sep 17 00:00:00 2001 From: Mike Yaacoub Date: Thu, 5 Jan 2023 13:09:53 -0500 Subject: [PATCH] Modified Writer Adapter Behavior for TCPServer to use poll and offer with a specified retry time (1s) --- .../tcpserver/TcpServerAdapterSpace.java | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java index 1ca4c9601..4d33bfe7c 100644 --- a/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java +++ b/adapter-tcp/src/main/java/io/nosqlbench/adapter/tcpserver/TcpServerAdapterSpace.java @@ -20,17 +20,13 @@ import io.nosqlbench.api.config.standard.ConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.Param; -import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.util.SSLKsFactory; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.ServerSocket; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.PrintWriter; import java.io.Writer; import java.net.InetAddress; -import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.io.OutputStream; @@ -39,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.net.ServerSocketFactory; import javax.net.ssl.SSLServerSocketFactory; @@ -173,19 +170,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{ } public static class SocketWriter implements Runnable, Shutdown { private final BlockingQueue sourceQueue; - private final OutputStreamWriter outWriter; + private final Socket connectedSocket; private boolean running = true; public SocketWriter(BlockingQueue sourceQueue, Socket connectedSocket) { this.sourceQueue = sourceQueue; - try { - OutputStream outputStream = connectedSocket.getOutputStream(); - this.outWriter = new OutputStreamWriter(outputStream); - connectedSocket.shutdownInput(); - } catch (Exception e) { - throw new RuntimeException(e); - } + this.connectedSocket = connectedSocket; } public void shutdown() { @@ -194,11 +185,21 @@ public class TcpServerAdapterSpace implements AutoCloseable{ @Override public void run() { - try (Writer runWriter = this.outWriter) { - while (running) { - while (!sourceQueue.isEmpty() ) { + OutputStream outputStream = null; + try { + outputStream = connectedSocket.getOutputStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + try (Writer runWriter = new OutputStreamWriter(outputStream);) { + while (running ) { + if(!sourceQueue.isEmpty()) { try { - String data = sourceQueue.take(); + //String data = sourceQueue.take(); + String data = sourceQueue.poll(1, TimeUnit.SECONDS); + if(data == null) { + continue; + } runWriter.write(data); runWriter.flush(); } catch (Exception e) { @@ -213,9 +214,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{ } catch (Exception e) { throw new RuntimeException(e); } - finally { - //TODO: Add the shutdown logic to do after closing the socket - } } @@ -224,17 +222,23 @@ public class TcpServerAdapterSpace implements AutoCloseable{ public static class QueueWriterAdapter extends Writer { private BlockingQueue queue; + private volatile boolean running = true; + public QueueWriterAdapter(BlockingQueue queue) { this.queue = queue; } @Override public synchronized void write( char[] cbuf, int off, int len) { - while (true) { + String message = new String(cbuf, off, len); + while (running) { try { - queue.put(new String(cbuf, off, len)); - return; + if(queue.offer(message, 1, TimeUnit.SECONDS)) { + return; + } } catch (InterruptedException ignored) { + logger.debug("QueueWriterAdapter was interrupted"); + running =false; } catch (Exception e) { throw new RuntimeException(e); } @@ -243,12 +247,12 @@ public class TcpServerAdapterSpace implements AutoCloseable{ @Override public synchronized void flush() throws IOException { - } @Override public synchronized void close() throws IOException { flush(); + running =false; queue = null; } @@ -280,7 +284,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{ managedShutdown.add(writer); Thread writerThread = new Thread(writer); writerThread.setName("SocketWriter/" + connectedSocket); - writerThread.setDaemon(true); + writerThread.setDaemon(false); writerThread.start(); logger.info("Started writer thread for " + connectedSocket); } catch (SocketTimeoutException e) {