diff --git a/adapter-tcp/src/main/java/io/nosqlbench/activitytype/tcpserver/TCPServerActivity.java b/adapter-tcp/src/main/java/io/nosqlbench/activitytype/tcpserver/TCPServerActivity.java deleted file mode 100644 index 998d94b44..000000000 --- a/adapter-tcp/src/main/java/io/nosqlbench/activitytype/tcpserver/TCPServerActivity.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright (c) 2022 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.activitytype.tcpserver; - -import io.nosqlbench.activitytype.stdout.StdoutActivity; -import io.nosqlbench.engine.api.activityimpl.ActivityDef; -import io.nosqlbench.engine.api.util.SSLKsFactory; -import io.nosqlbench.nb.api.config.standard.NBConfiguration; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import javax.net.ServerSocketFactory; -import javax.net.ssl.SSLServerSocketFactory; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - - -public class TCPServerActivity extends StdoutActivity { - - private final static Logger logger = LogManager.getLogger(TCPServerActivity.class); - private final ServerSocketFactory socketFactory; - private final LinkedBlockingQueue queue; - private ServerSocket listenerSocket; - private final List managedShutdown = new ArrayList<>(); - private int capacity=10; - - - public TCPServerActivity(ActivityDef activityDef) { - super(activityDef); - boolean sslEnabled = activityDef.getParams().getOptionalBoolean("ssl").orElse(false); - this.capacity=activityDef.getParams().getOptionalInteger("capacity").orElse(10); - queue = new LinkedBlockingQueue<>(capacity); - - if (sslEnabled) { - - NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(activityDef.getParams()); - socketFactory = SSLKsFactory.get().createSSLServerSocketFactory(sslCfg); - } else { - socketFactory = ServerSocketFactory.getDefault(); - } - } - - @Override - public void onActivityDefUpdate(ActivityDef activityDef) { - super.onActivityDefUpdate(activityDef); - } - - @Override - public void shutdownActivity() { - super.shutdownActivity(); - for (Shutdown toClose : managedShutdown) { - toClose.shutdown(); - } - } - - @Override - public synchronized void write(String statement) { - while (true) { - try { - queue.put(statement); - return; - } catch (InterruptedException ignored) { - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - @Override - protected synchronized Writer createPrintWriter() { - - String host = getActivityDef().getParams().getOptionalString("host").orElse("localhost"); - int port = getActivityDef().getParams().getOptionalInteger("port").orElse(12345); - - if (listenerSocket == null || listenerSocket.isClosed()) { - try { - InetAddress hostAddr = InetAddress.getByName(host); - listenerSocket = socketFactory.createServerSocket(port, 10, hostAddr); - if (socketFactory instanceof SSLServerSocketFactory) { - logger.info("SSL enabled on server socket " + listenerSocket); - } - SocketAcceptor socketAcceptor = new SocketAcceptor(queue, listenerSocket); - managedShutdown.add(socketAcceptor); - Thread acceptorThread = new Thread(socketAcceptor); - acceptorThread.setDaemon(true); - acceptorThread.setName("Listener/" + listenerSocket); - acceptorThread.start(); - } catch (IOException e) { - throw new RuntimeException("Error listening on listenerSocket:" + e, e); - } - } - - QueueWriterAdapter queueWriterAdapter = new QueueWriterAdapter(this.queue); - logger.info("initialized queue writer:" + queueWriterAdapter); - return queueWriterAdapter; - - } - - private interface Shutdown { - void shutdown(); - } - - public static class SocketWriter implements Runnable, Shutdown { - private final BlockingQueue sourceQueue; - private final OutputStream outputStream; - private final OutputStreamWriter writer; - private boolean running = true; - - - public SocketWriter(BlockingQueue sourceQueue, Socket connectedSocket) { - this.sourceQueue = sourceQueue; - try { - outputStream = connectedSocket.getOutputStream(); - this.writer = new OutputStreamWriter(outputStream); - //connectedSocket.shutdownInput(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void shutdown() { - this.running = false; - } - - @Override - public void run() { - try (Writer writer = this.writer) { - while (true) { - while (!sourceQueue.isEmpty() || running) { - try { - String data = sourceQueue.take(); - writer.write(data); - writer.flush(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - - } - - public static class QueueWriterAdapter extends Writer { - private BlockingQueue queue; - - public QueueWriterAdapter(BlockingQueue queue) { - this.queue = queue; - } - - @Override - public synchronized void write( char[] cbuf, int off, int len) { - while (true) { - try { - queue.put(new String(cbuf, off, len)); - return; - } catch (InterruptedException ignored) { - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - @Override - public synchronized void flush() throws IOException { - } - - @Override - public synchronized void close() throws IOException { - flush(); - queue = null; - } - - } - - public class SocketAcceptor implements Runnable, Shutdown { - private final BlockingQueue queue; - private final ServerSocket serverSocket; - private boolean running = true; - - public SocketAcceptor(BlockingQueue queue, ServerSocket serverSocket) { - this.queue = queue; - this.serverSocket = serverSocket; - } - - public void shutdown() { - this.running = false; - } - - @Override - public void run() { - try (ServerSocket serverSocket = this.serverSocket) { - while (running) { - serverSocket.setSoTimeout(1000); - serverSocket.setReuseAddress(true); - try { - Socket connectedSocket = serverSocket.accept(); - SocketWriter writer = new SocketWriter(queue, connectedSocket); - TCPServerActivity.this.managedShutdown.add(writer); - Thread writerThread = new Thread(writer); - writerThread.setName("SocketWriter/" + connectedSocket); - writerThread.setDaemon(true); - writerThread.start(); - logger.info("Started writer thread for " + connectedSocket); - } catch (SocketTimeoutException ignored) { - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - -} - - diff --git a/adapter-tcp/src/main/java/io/nosqlbench/activitytype/tcpserver/TCPServerActivityType.java b/adapter-tcp/src/main/java/io/nosqlbench/activitytype/tcpserver/TCPServerActivityType.java deleted file mode 100644 index 45c71c946..000000000 --- a/adapter-tcp/src/main/java/io/nosqlbench/activitytype/tcpserver/TCPServerActivityType.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2022 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.activitytype.tcpserver; - -import io.nosqlbench.activitytype.stdout.StdoutAction; -import io.nosqlbench.activitytype.stdout.StdoutActivity; -import io.nosqlbench.engine.api.activityapi.core.Action; -import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; -import io.nosqlbench.engine.api.activityapi.core.ActivityType; -import io.nosqlbench.engine.api.activityimpl.ActivityDef; -import io.nosqlbench.nb.annotations.Service; - -@Service(value= ActivityType.class, selector="tcpserver") -public class TCPServerActivityType implements ActivityType { - - @Override - public TCPServerActivity getActivity(ActivityDef activityDef) { - return new TCPServerActivity(activityDef); - } - - @Override - public ActionDispenser getActionDispenser(TCPServerActivity activity) { - return new Dispenser(activity); - } - - private static class Dispenser implements ActionDispenser { - private final StdoutActivity activity; - - private Dispenser(StdoutActivity activity) { - this.activity = activity; - } - - @Override - public Action getAction(int slot) { - return new StdoutAction(slot,this.activity); - } - } -}