mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Cleaned up code after review.
This commit is contained in:
@@ -26,7 +26,6 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.Writer;
|
||||
@@ -34,10 +33,9 @@ import java.net.Socket;
|
||||
|
||||
public class TcpClientAdapterSpace {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
|
||||
private final static Logger logger = LogManager.getLogger(TcpClientAdapterSpace.class);
|
||||
private final NBConfiguration config;
|
||||
Writer writer;
|
||||
private PrintWriter console;
|
||||
|
||||
public TcpClientAdapterSpace(NBConfiguration config) {
|
||||
this.config = config;
|
||||
@@ -54,7 +52,7 @@ public class TcpClientAdapterSpace {
|
||||
}
|
||||
|
||||
String host = config.getOptional("host").orElse("localhost");
|
||||
int port = config.getOptional(int.class, "port").orElse(8080);
|
||||
int port = config.getOptional(int.class, "port").orElse(12345);
|
||||
|
||||
try {
|
||||
Socket socket = socketFactory.createSocket(host, port);
|
||||
@@ -78,11 +76,11 @@ public class TcpClientAdapterSpace {
|
||||
.add(SSLKsFactory.get().getConfigModel())
|
||||
.add(
|
||||
Param.defaultTo("host","localhost")
|
||||
.setDescription("")
|
||||
.setDescription("the host address to use")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("port",8080)
|
||||
.setDescription("")
|
||||
Param.defaultTo("port",12345)
|
||||
.setDescription("the designated port to connect to on the socket")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("filename","tcpclient")
|
||||
@@ -95,16 +93,18 @@ public class TcpClientAdapterSpace {
|
||||
.add(
|
||||
Param.optional("format")
|
||||
.setRegex("csv|readout|json|inlinejson|assignments|diag")
|
||||
.setDescription("Which format to use.\n" +
|
||||
"If provided, the format will override any statement formats provided by the YAML. " +
|
||||
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
|
||||
.setDescription("""
|
||||
Which format to use.
|
||||
If provided, the format will override any statement formats provided by the YAML.
|
||||
If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("bindings","doc")
|
||||
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
|
||||
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
|
||||
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
|
||||
"here as well.")
|
||||
.setDescription("""
|
||||
This is a simple way to specify a filter for the names of bindings that you want to use.
|
||||
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
|
||||
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
|
||||
"here as well.""")
|
||||
|
||||
)
|
||||
.asReadOnly();
|
||||
|
||||
@@ -33,7 +33,6 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@Service(value= DriverAdapter.class,selector = "tcpclient")
|
||||
public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpClientAdapterSpace> implements SyntheticOpTemplateProvider {
|
||||
|
||||
@@ -23,21 +23,21 @@ import java.util.function.LongFunction;
|
||||
|
||||
public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClientAdapterSpace> {
|
||||
|
||||
private final LongFunction<TcpClientAdapterSpace> ctxfunc;
|
||||
private final LongFunction<TcpClientAdapterSpace> ctxFunction;
|
||||
private final LongFunction<String> outFunction;
|
||||
|
||||
public TcpClientOpDispenser(TcpClientDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpClientAdapterSpace> ctxfunc) {
|
||||
super(adapter,cmd);
|
||||
this.ctxfunc = ctxfunc;
|
||||
this.ctxFunction = ctxfunc;
|
||||
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
|
||||
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
|
||||
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
|
||||
this.outFunction = stringfunc;
|
||||
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
|
||||
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
|
||||
this.outFunction = stringFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TcpClientOp apply(long value) {
|
||||
TcpClientAdapterSpace ctx = ctxfunc.apply(value);
|
||||
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
|
||||
String output = outFunction.apply(value);
|
||||
return new TcpClientOp(ctx,output);
|
||||
}
|
||||
|
||||
@@ -43,21 +43,7 @@ import javax.net.ServerSocketFactory;
|
||||
import javax.net.ssl.SSLServerSocketFactory;
|
||||
|
||||
public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
|
||||
while(this.queue != null && !this.queue.isEmpty())
|
||||
{
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
}catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
logger.info("TcpServerAdapterSpace is being closed");
|
||||
for (Shutdown toClose : managedShutdown) {
|
||||
toClose.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(TcpServerAdapterSpace.class);
|
||||
private final NBConfiguration config;
|
||||
@@ -66,7 +52,6 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
private ServerSocket listenerSocket;
|
||||
private final List<Shutdown> managedShutdown = new ArrayList<>();
|
||||
private int capacity=10;
|
||||
private ServerSocketFactory socketFactory;
|
||||
|
||||
public TcpServerAdapterSpace(NBConfiguration config) {
|
||||
this.config = config;
|
||||
@@ -78,7 +63,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
boolean sslEnabled = config.getOptional(Boolean.class, "ssl").orElse(false);
|
||||
this.capacity=config.getOptional(int.class, "capacity").orElse(10);
|
||||
queue = new LinkedBlockingQueue<>(capacity);
|
||||
|
||||
ServerSocketFactory socketFactory;
|
||||
if (sslEnabled) {
|
||||
|
||||
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(config);
|
||||
@@ -88,7 +73,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
}
|
||||
|
||||
String host = config.getOptional("host").orElse("localhost");
|
||||
int port = config.getOptional(int.class, "port").orElse(8080);
|
||||
int port = config.getOptional(int.class, "port").orElse(12345);
|
||||
|
||||
if (listenerSocket == null || listenerSocket.isClosed()) {
|
||||
try {
|
||||
@@ -113,6 +98,22 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
return queueWriterAdapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
logger.info("TcpServerAdapterSpace is waiting for message queue to empty");
|
||||
while(this.queue != null && !this.queue.isEmpty())
|
||||
{
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
}catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
logger.info("TcpServerAdapterSpace is being closed");
|
||||
for (Shutdown toClose : managedShutdown) {
|
||||
toClose.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeflush(String text) {
|
||||
try {
|
||||
if(this.writer == null)
|
||||
@@ -130,15 +131,15 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
.add(SSLKsFactory.get().getConfigModel())
|
||||
.add(
|
||||
Param.defaultTo("capacity",10)
|
||||
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
|
||||
.setDescription("the capacity of the queue")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("host","localhost")
|
||||
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
|
||||
.setDescription("the host address to use")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("port",8080)
|
||||
.setDescription("this is the name of the output file. If 'stdout', output is sent to tcpserver, not a file.")
|
||||
Param.defaultTo("port",12345)
|
||||
.setDescription("the designated port to connect to on the socket")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("filename","tcpserver")
|
||||
@@ -151,16 +152,18 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
.add(
|
||||
Param.optional("format")
|
||||
.setRegex("csv|readout|json|inlinejson|assignments|diag")
|
||||
.setDescription("Which format to use.\n" +
|
||||
"If provided, the format will override any statement formats provided by the YAML. " +
|
||||
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.")
|
||||
.setDescription("""
|
||||
Which format to use.
|
||||
"If provided, the format will override any statement formats provided by the YAML.
|
||||
"If 'diag' is used, a diagnostic readout will be provided for binding constructions.""")
|
||||
)
|
||||
.add(
|
||||
Param.defaultTo("bindings","doc")
|
||||
.setDescription("This is a simple way to specify a filter for the names of bindings that you want to use.\n" +
|
||||
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken\n" +
|
||||
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding\n" +
|
||||
"here as well.")
|
||||
.setDescription("""
|
||||
This is a simple way to specify a filter for the names of bindings that you want to use.
|
||||
"If this is 'doc', then all the document level bindings are used. If it is any other value, it is taken
|
||||
"as a pattern (regex) to subselect a set of bindings by name. You can simply use the name of a binding
|
||||
"here as well.""")
|
||||
|
||||
)
|
||||
.asReadOnly();
|
||||
@@ -170,17 +173,16 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
}
|
||||
public static class SocketWriter implements Runnable, Shutdown {
|
||||
private final BlockingQueue<String> sourceQueue;
|
||||
private final OutputStream outputStream;
|
||||
private final OutputStreamWriter writer;
|
||||
private final OutputStreamWriter outWriter;
|
||||
private boolean running = true;
|
||||
|
||||
|
||||
public SocketWriter(BlockingQueue<String> sourceQueue, Socket connectedSocket) {
|
||||
this.sourceQueue = sourceQueue;
|
||||
try {
|
||||
outputStream = connectedSocket.getOutputStream();
|
||||
this.writer = new OutputStreamWriter(outputStream);
|
||||
//connectedSocket.shutdownInput();
|
||||
OutputStream outputStream = connectedSocket.getOutputStream();
|
||||
this.outWriter = new OutputStreamWriter(outputStream);
|
||||
connectedSocket.shutdownInput();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -192,13 +194,13 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (Writer writer = this.writer) {
|
||||
try (Writer runWriter = this.outWriter) {
|
||||
while (running) {
|
||||
while (!sourceQueue.isEmpty() ) {
|
||||
try {
|
||||
String data = sourceQueue.take();
|
||||
writer.write(data);
|
||||
writer.flush();
|
||||
runWriter.write(data);
|
||||
runWriter.flush();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -241,6 +243,7 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
|
||||
@Override
|
||||
public synchronized void flush() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -267,20 +270,21 @@ public class TcpServerAdapterSpace implements AutoCloseable{
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (ServerSocket serverSocket = this.serverSocket) {
|
||||
try (ServerSocket runServerSocket = this.serverSocket) {
|
||||
while (running) {
|
||||
serverSocket.setSoTimeout(1000);
|
||||
serverSocket.setReuseAddress(true);
|
||||
runServerSocket.setSoTimeout(1000);
|
||||
runServerSocket.setReuseAddress(true);
|
||||
try {
|
||||
Socket connectedSocket = serverSocket.accept();
|
||||
Socket connectedSocket = runServerSocket.accept();
|
||||
SocketWriter writer = new SocketWriter(queue, connectedSocket);
|
||||
//TCPServerActivity.this.managedShutdown.add(writer);
|
||||
managedShutdown.add(writer);
|
||||
Thread writerThread = new Thread(writer);
|
||||
writerThread.setName("SocketWriter/" + connectedSocket);
|
||||
writerThread.setDaemon(true);
|
||||
writerThread.start();
|
||||
logger.info("Started writer thread for " + connectedSocket);
|
||||
} catch (SocketTimeoutException ignored) {
|
||||
} catch (SocketTimeoutException e) {
|
||||
logger.debug("Socket timeout when waiting for a client connection to SocketWriter Server");
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
||||
@@ -17,29 +17,27 @@
|
||||
package io.nosqlbench.adapter.tcpserver;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerAdapterSpace> {
|
||||
|
||||
private final LongFunction<TcpServerAdapterSpace> ctxfunc;
|
||||
private final LongFunction<TcpServerAdapterSpace> ctxFunction;
|
||||
private final LongFunction<String> outFunction;
|
||||
|
||||
public TcpServerOpDispenser(TcpServerDriverAdapter adapter, ParsedOp cmd, LongFunction<TcpServerAdapterSpace> ctxfunc) {
|
||||
super(adapter,cmd);
|
||||
this.ctxfunc = ctxfunc;
|
||||
this.ctxFunction = ctxfunc;
|
||||
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
|
||||
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
|
||||
cmd.enhanceFuncOptionally(stringfunc,"suffix",String.class,(a, b) -> a+b);
|
||||
this.outFunction = stringfunc;
|
||||
LongFunction<String> stringFunction = l -> objectFunction.apply(l).toString();
|
||||
cmd.enhanceFuncOptionally(stringFunction,"suffix",String.class,(a, b) -> a+b);
|
||||
this.outFunction = stringFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TcpServerOp apply(long value) {
|
||||
TcpServerAdapterSpace ctx = ctxfunc.apply(value);
|
||||
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
|
||||
String output = outFunction.apply(value);
|
||||
return new TcpServerOp(ctx,output);
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
|
||||
- default: localhost
|
||||
- dynamic: false
|
||||
- **port** - this is the name of the port to listen on
|
||||
- default: 8080
|
||||
- default: 12345
|
||||
- dynamic: false
|
||||
- **capacity** - the size of the internal blocking queue
|
||||
- default: 10
|
||||
|
||||
@@ -51,7 +51,7 @@ Run a stdout activity named 'stdout-test', with definitions from activities/stdo
|
||||
- default: localhost
|
||||
- dynamic: false
|
||||
- **port** - this is the name of the port to listen on
|
||||
- default: 8080
|
||||
- default: 12345
|
||||
- dynamic: false
|
||||
- **capacity** - the size of the internal blocking queue
|
||||
- default: 10
|
||||
|
||||
Reference in New Issue
Block a user