Modified Writer Adapter Behavior for TCPServer to use poll and offer with a specified retry time (1s)

This commit is contained in:
Mike Yaacoub 2023-01-05 13:09:53 -05:00
parent ccc7f7ab3f
commit 6146d186d9

View File

@ -20,17 +20,13 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param; import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.util.SSLKsFactory; import io.nosqlbench.api.engine.util.SSLKsFactory;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer; import java.io.Writer;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.io.OutputStream; import java.io.OutputStream;
@ -39,6 +35,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory; import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory; import javax.net.ssl.SSLServerSocketFactory;
@ -173,19 +170,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{
} }
public static class SocketWriter implements Runnable, Shutdown { public static class SocketWriter implements Runnable, Shutdown {
private final BlockingQueue<String> sourceQueue; private final BlockingQueue<String> sourceQueue;
private final OutputStreamWriter outWriter; private final Socket connectedSocket;
private boolean running = true; private boolean running = true;
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) { public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
this.sourceQueue = sourceQueue; this.sourceQueue = sourceQueue;
try { this.connectedSocket = connectedSocket;
OutputStream outputStream = connectedSocket.getOutputStream();
this.outWriter = new OutputStreamWriter(outputStream);
connectedSocket.shutdownInput();
} catch (Exception e) {
throw new RuntimeException(e);
}
} }
public void shutdown() { public void shutdown() {
@ -194,11 +185,21 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override @Override
public void run() { public void run() {
try (Writer runWriter = this.outWriter) { OutputStream outputStream = null;
while (running) { try {
while (!sourceQueue.isEmpty() ) { outputStream = connectedSocket.getOutputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
try (Writer runWriter = new OutputStreamWriter(outputStream);) {
while (running ) {
if(!sourceQueue.isEmpty()) {
try { try {
String data = sourceQueue.take(); //String data = sourceQueue.take();
String data = sourceQueue.poll(1, TimeUnit.SECONDS);
if(data == null) {
continue;
}
runWriter.write(data); runWriter.write(data);
runWriter.flush(); runWriter.flush();
} catch (Exception e) { } catch (Exception e) {
@ -213,9 +214,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
finally {
//TODO: Add the shutdown logic to do after closing the socket
}
} }
@ -224,17 +222,23 @@ public class TcpServerAdapterSpace implements AutoCloseable{
public static class QueueWriterAdapter extends Writer { public static class QueueWriterAdapter extends Writer {
private BlockingQueue<String> queue; private BlockingQueue<String> queue;
private volatile boolean running = true;
public QueueWriterAdapter(BlockingQueue<String> queue) { public QueueWriterAdapter(BlockingQueue<String> queue) {
this.queue = queue; this.queue = queue;
} }
@Override @Override
public synchronized void write( char[] cbuf, int off, int len) { public synchronized void write( char[] cbuf, int off, int len) {
while (true) { String message = new String(cbuf, off, len);
while (running) {
try { try {
queue.put(new String(cbuf, off, len)); if(queue.offer(message, 1, TimeUnit.SECONDS)) {
return; return;
}
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
logger.debug("QueueWriterAdapter was interrupted");
running =false;
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -243,12 +247,12 @@ public class TcpServerAdapterSpace implements AutoCloseable{
@Override @Override
public synchronized void flush() throws IOException { public synchronized void flush() throws IOException {
} }
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
flush(); flush();
running =false;
queue = null; queue = null;
} }
@ -280,7 +284,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
managedShutdown.add(writer); managedShutdown.add(writer);
Thread writerThread = new Thread(writer); Thread writerThread = new Thread(writer);
writerThread.setName("SocketWriter/" + connectedSocket); writerThread.setName("SocketWriter/" + connectedSocket);
writerThread.setDaemon(true); writerThread.setDaemon(false);
writerThread.start(); writerThread.start();
logger.info("Started writer thread for " + connectedSocket); logger.info("Started writer thread for " + connectedSocket);
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {