From 99dcd84c9e4bf42af16371a77cc5d34745afef6b Mon Sep 17 00:00:00 2001 From: Martin Uhlir Date: Fri, 6 Feb 2015 15:41:38 +0100 Subject: [PATCH] Added option to initiate connection to device - as this feature was missing (according to OF specification) - added integration test Change-Id: I0f7346195999170a74975ee3438bc948fd685e61 Signed-off-by: Martin Uhlir --- .../impl/core/ConnectionInitializer.java | 16 +++ .../core/SwitchConnectionProviderImpl.java | 22 +++- .../impl/core/TcpChannelInitializer.java | 21 +-- .../impl/core/TcpConnectionInitializer.java | 80 +++++++++++ .../protocol/impl/core/TcpHandler.java | 30 +++-- .../protocol/impl/core/TcpHandlerTest.java | 3 +- .../it/integration/IntegrationTest.java | 60 +++++++-- .../protocol/it/integration/MockPlugin.java | 13 +- .../connection/SwitchConnectionProvider.java | 1 - .../impl/clients/ListeningSimpleClient.java | 124 ++++++++++++++++++ 10 files changed, 333 insertions(+), 37 deletions(-) create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ConnectionInitializer.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java create mode 100644 simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/ListeningSimpleClient.java diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ConnectionInitializer.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ConnectionInitializer.java new file mode 100644 index 00000000..4959edaa --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ConnectionInitializer.java @@ -0,0 +1,16 @@ +package org.opendaylight.openflowjava.protocol.impl.core; + +/** + * @author martin.uhlir + * + */ +public interface ConnectionInitializer { + + /** + * Initiates connection towards device + * @param host - host IP + * @param port - port number + */ + void initiateConnection(String host, int port); + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java index 9e8a7a18..92f05827 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java @@ -9,6 +9,8 @@ package org.opendaylight.openflowjava.protocol.impl.core; +import io.netty.channel.nio.NioEventLoopGroup; + import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry; @@ -58,7 +60,7 @@ import com.google.common.util.concurrent.SettableFuture; * @author mirehak * @author michal.polkorab */ -public class SwitchConnectionProviderImpl implements SwitchConnectionProvider { +public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer { private static final Logger LOGGER = LoggerFactory .getLogger(SwitchConnectionProviderImpl.class); @@ -69,6 +71,7 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider { private SerializerRegistry serializerRegistry; private DeserializerRegistry deserializerRegistry; private DeserializationFactory deserializationFactory; + private TcpConnectionInitializer connectionInitializer; /** Constructor */ public SwitchConnectionProviderImpl() { @@ -137,7 +140,14 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider { TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol(); if (transportProtocol.equals(TransportProtocol.TCP) || transportProtocol.equals(TransportProtocol.TLS)) { server = new TcpHandler(connConfig.getAddress(), connConfig.getPort()); - ((TcpHandler) server).setChannelInitializer(factory.createPublishingChannelInitializer()); + TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer(); + ((TcpHandler) server).setChannelInitializer(channelInitializer); + ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration()); + + NioEventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup(); + connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler); + connectionInitializer.setChannelInitializer(channelInitializer); + connectionInitializer.run(); } else if (transportProtocol.equals(TransportProtocol.UDP)){ server = new UdpHandler(connConfig.getAddress(), connConfig.getPort()); ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer()); @@ -265,4 +275,10 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider { OFSerializer serializer) { serializerRegistry.registerSerializer(key, serializer); } -} \ No newline at end of file + + @Override + public void initiateConnection(String host, int port) { + connectionInitializer.initiateConnection(host, port); + } + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpChannelInitializer.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpChannelInitializer.java index 07aab354..8282f078 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpChannelInitializer.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpChannelInitializer.java @@ -54,15 +54,18 @@ public class TcpChannelInitializer extends ProtocolChannelInitializer :" + port); - if (!getSwitchConnectionHandler().accept(switchAddress)) { - ch.disconnect(); - LOGGER.debug("Incoming connection rejected"); - return; + if (ch.remoteAddress() != null) { + InetAddress switchAddress = ch.remoteAddress().getAddress(); + int port = ch.localAddress().getPort(); + int remotePort = ch.remoteAddress().getPort(); + LOGGER.debug("Incoming connection from (remote address): " + switchAddress.toString() + + ":" + remotePort + " --> :" + port); + + if (!getSwitchConnectionHandler().accept(switchAddress)) { + ch.disconnect(); + LOGGER.debug("Incoming connection rejected"); + return; + } } LOGGER.debug("Incoming connection accepted - building pipeline"); allChannels.add(ch); diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java new file mode 100644 index 00000000..c4b0937e --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java @@ -0,0 +1,80 @@ +package org.opendaylight.openflowjava.protocol.impl.core; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * Initializes (TCP) connection to device + * @author martin.uhlir + * + */ +public class TcpConnectionInitializer implements ServerFacade, + ConnectionInitializer { + + private static final Logger LOGGER = LoggerFactory + .getLogger(TcpConnectionInitializer.class); + private EventLoopGroup workerGroup; + private ThreadConfiguration threadConfig; + + private TcpChannelInitializer channelInitializer; + private Bootstrap b; + + /** + * Constructor + * @param workerGroup - shared worker group + */ + public TcpConnectionInitializer(NioEventLoopGroup workerGroup) { + Preconditions.checkNotNull(workerGroup, "WorkerGroup can't be null"); + this.workerGroup = workerGroup; + } + + @Override + public void run() { + b = new Bootstrap(); + b.group(workerGroup).channel(NioSocketChannel.class) + .handler(channelInitializer); + } + + @Override + public ListenableFuture shutdown() { + final SettableFuture result = SettableFuture.create(); + workerGroup.shutdownGracefully(); + return result; + } + + @Override + public ListenableFuture getIsOnlineFuture() { + return null; + } + + @Override + public void setThreadConfig(ThreadConfiguration threadConfig) { + this.threadConfig = threadConfig; + } + + @Override + public void initiateConnection(String host, int port) { + try { + b.connect(host, port).sync(); + } catch (InterruptedException e) { + LOGGER.error("Unable to initiate connection", e); + } + } + + /** + * @param channelInitializer + */ + public void setChannelInitializer(TcpChannelInitializer channelInitializer) { + this.channelInitializer = channelInitializer; + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java index 0c26354e..48d5aa14 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java @@ -82,14 +82,6 @@ public class TcpHandler implements ServerFacade { */ @Override public void run() { - if (threadConfig != null) { - bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount()); - workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount()); - } else { - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); - } - /* * We generally do not perform IO-unrelated tasks, so we want to have * all outstanding tasks completed before the executing thread goes @@ -204,4 +196,26 @@ public class TcpHandler implements ServerFacade { public void setThreadConfig(ThreadConfiguration threadConfig) { this.threadConfig = threadConfig; } + + /** + * Initiate event loop groups + * @param threadConfiguration number of threads to be created, if not specified in threadConfig + */ + public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration) { + if (threadConfiguration != null) { + bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount()); + workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount()); + } else { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + } + } + + /** + * @return workerGroup + */ + public NioEventLoopGroup getWorkerGroup() { + return workerGroup; + } + } diff --git a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java index 129a8668..6d103355 100644 --- a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java +++ b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java @@ -123,6 +123,7 @@ public class TcpHandlerTest { try { tcpHandler = new TcpHandler(serverAddress, serverPort); tcpHandler.setChannelInitializer(mockChannelInitializer); + tcpHandler.initiateEventLoopGroups(null); tcpHandler.run(); } catch (Exception e) { if (e instanceof BindException) { @@ -150,7 +151,7 @@ public class TcpHandlerTest { */ private Boolean startupServer() throws InterruptedException, IOException, ExecutionException { ListenableFuture online = tcpHandler.getIsOnlineFuture(); - + tcpHandler.initiateEventLoopGroups(null); (new Thread(tcpHandler)).start(); int retry = 0; while (online.isDone() != true && retry++ < 20) { diff --git a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/IntegrationTest.java b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/IntegrationTest.java index 6464a62f..3eedff52 100644 --- a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/IntegrationTest.java +++ b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/IntegrationTest.java @@ -12,7 +12,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Deque; import java.util.List; - import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -21,6 +20,7 @@ import org.junit.Test; import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration; import org.opendaylight.openflowjava.protocol.api.connection.TlsConfigurationImpl; import org.opendaylight.openflowjava.protocol.impl.clients.ClientEvent; +import org.opendaylight.openflowjava.protocol.impl.clients.ListeningSimpleClient; import org.opendaylight.openflowjava.protocol.impl.clients.OFClient; import org.opendaylight.openflowjava.protocol.impl.clients.ScenarioFactory; import org.opendaylight.openflowjava.protocol.impl.clients.ScenarioHandler; @@ -58,6 +58,9 @@ public class IntegrationTest { private SwitchConnectionProviderImpl switchConnectionProvider; private ConnectionConfigurationImpl connConfig; + private Thread t; + + private enum ClientType {SIMPLE, LISTENING} /** * @param protocol communication protocol to be used during test * @throws Exception @@ -110,7 +113,7 @@ public class IntegrationTest { int amountOfCLients = 1; Deque scenario = ScenarioFactory.createHandshakeScenario(); ScenarioHandler handler = new ScenarioHandler(scenario); - List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TCP); + List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TCP, ClientType.SIMPLE); OFClient firstClient = clients.get(0); firstClient.getScenarioDone().get(); Thread.sleep(1000); @@ -128,7 +131,7 @@ public class IntegrationTest { int amountOfCLients = 1; Deque scenario = ScenarioFactory.createHandshakeScenario(); ScenarioHandler handler = new ScenarioHandler(scenario); - List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TLS); + List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TLS, ClientType.SIMPLE); OFClient firstClient = clients.get(0); firstClient.getScenarioDone().get(); Thread.sleep(1000); @@ -150,7 +153,7 @@ public class IntegrationTest { scenario.addFirst(new SleepEvent(1000)); scenario.addFirst(new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 03 00 08 00 00 00 04"))); ScenarioHandler handler = new ScenarioHandler(scenario); - List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TCP); + List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TCP, ClientType.SIMPLE); OFClient firstClient = clients.get(0); firstClient.getScenarioDone().get(); @@ -171,7 +174,7 @@ public class IntegrationTest { scenario.addFirst(new SleepEvent(1000)); scenario.addFirst(new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 03 00 08 00 00 00 04"))); ScenarioHandler handler = new ScenarioHandler(scenario); - List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TLS); + List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.TLS, ClientType.SIMPLE); OFClient firstClient = clients.get(0); firstClient.getScenarioDone().get(); @@ -192,7 +195,7 @@ public class IntegrationTest { scenario.addFirst(new SleepEvent(1000)); scenario.addFirst(new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 03 00 08 00 00 00 04"))); ScenarioHandler handler = new ScenarioHandler(scenario); - List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.UDP); + List clients = createAndStartClient(amountOfCLients, handler, TransportProtocol.UDP, ClientType.SIMPLE); OFClient firstClient = clients.get(0); firstClient.getScenarioDone().get(); @@ -215,23 +218,35 @@ public class IntegrationTest { * @throws ExecutionException if some client could not start */ private List createAndStartClient(int amountOfCLients, ScenarioHandler scenarioHandler, - TransportProtocol protocol) throws ExecutionException { + TransportProtocol protocol, ClientType clientType) throws ExecutionException { List clientsHorde = new ArrayList<>(); for (int i = 0; i < amountOfCLients; i++) { LOGGER.debug("startup address in createclient: " + startupAddress.getHostAddress()); OFClient sc = null; - if (protocol.equals(TransportProtocol.TCP)) { - sc = new SimpleClient(startupAddress.getHostAddress(), port); + if (clientType == ClientType.SIMPLE) { + if (protocol.equals(TransportProtocol.TCP)) { + sc = new SimpleClient(startupAddress.getHostAddress(), port); + sc.setSecuredClient(false); + } else if (protocol.equals(TransportProtocol.TLS)) { + sc = new SimpleClient(startupAddress.getHostAddress(), port); + sc.setSecuredClient(true); + } else { + sc = new UdpSimpleClient(startupAddress.getHostAddress(), port); + } + } else if (clientType == ClientType.LISTENING) { + sc = new ListeningSimpleClient(0); + sc.setScenarioHandler(scenarioHandler); sc.setSecuredClient(false); - } else if (protocol.equals(TransportProtocol.TLS)) { - sc = new SimpleClient(startupAddress.getHostAddress(), port); - sc.setSecuredClient(true); } else { - sc = new UdpSimpleClient(startupAddress.getHostAddress(), port); + LOGGER.error("Unknown type of client."); + throw new IllegalStateException("Unknown type of client."); } + sc.setScenarioHandler(scenarioHandler); clientsHorde.add(sc); - sc.run(); + //sc.run(); + t = new Thread(sc); + t.start(); } for (OFClient sc : clientsHorde) { try { @@ -244,4 +259,21 @@ public class IntegrationTest { return clientsHorde; } + /** + * @throws Exception + */ + @Test + public void testInitiateConnection() throws Exception { + setUp(TransportProtocol.TCP); + + Deque scenario = ScenarioFactory.createHandshakeScenario(); + ScenarioHandler handler = new ScenarioHandler(scenario); + List clients = createAndStartClient(1, handler, TransportProtocol.TCP, ClientType.LISTENING); + OFClient ofClient = clients.get(0); + ofClient.getIsOnlineFuture().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + int listeningClientPort = ((ListeningSimpleClient) ofClient).getPort(); + mockPlugin.initiateConnection(switchConnectionProvider, "localhost", listeningClientPort); + ofClient.getScenarioDone().get(); + LOGGER.debug("testInitiateConnection() Finished") ; + } } diff --git a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/MockPlugin.java b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/MockPlugin.java index f8298bbf..e83bac83 100644 --- a/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/MockPlugin.java +++ b/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/MockPlugin.java @@ -18,6 +18,8 @@ import java.util.concurrent.TimeoutException; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; +import org.opendaylight.openflowjava.protocol.impl.core.SwitchConnectionProviderImpl; +import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage; @@ -231,5 +233,14 @@ public class MockPlugin implements OpenflowProtocolListener, SwitchConnectionHan LOGGER.trace("MockPlugin().onConnectionReady()"); } - + /** + * Initiates connection to device + * @param switchConnectionProvider + * @param host - host IP + * @param port - port number + */ + public void initiateConnection(SwitchConnectionProviderImpl switchConnectionProvider, String host, int port) { + LOGGER.trace("MockPlugin().initiateConnection()"); + switchConnectionProvider.initiateConnection(host, port); + } } diff --git a/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java b/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java index b57500b4..c6910b14 100644 --- a/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java +++ b/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java @@ -46,5 +46,4 @@ public interface SwitchConnectionProvider extends AutoCloseable, * @param switchConHandler instance being informed when new switch connects */ void setSwitchConnectionHandler(SwitchConnectionHandler switchConHandler); - } \ No newline at end of file diff --git a/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/ListeningSimpleClient.java b/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/ListeningSimpleClient.java new file mode 100644 index 00000000..0288745f --- /dev/null +++ b/simple-client/src/main/java/org/opendaylight/openflowjava/protocol/impl/clients/ListeningSimpleClient.java @@ -0,0 +1,124 @@ +package org.opendaylight.openflowjava.protocol.impl.clients; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.Future; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.SettableFuture; + +/** + * Listening client for testing purposes + * @author martin.uhlir + * + */ +public class ListeningSimpleClient implements OFClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(ListeningSimpleClient.class); + private int port; + private boolean securedClient = false; + private EventLoopGroup workerGroup; + private SettableFuture isOnlineFuture; + private SettableFuture scenarioDone; + private ScenarioHandler scenarioHandler; + + /** + * Constructor of the class + * + * @param host address of host + * @param port host listening port + */ + public ListeningSimpleClient(int port) { + this.port = port; + init(); + } + + private void init() { + isOnlineFuture = SettableFuture.create(); + scenarioDone = SettableFuture.create(); + } + + /** + * Starting class of {@link ListeningSimpleClient} + */ + @Override + public void run() { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + SimpleClientInitializer clientInitializer = new SimpleClientInitializer(isOnlineFuture, securedClient); + clientInitializer.setScenario(scenarioHandler); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(clientInitializer); + + ChannelFuture f = b.bind(port).sync(); + // Update port, as it may have been specified as 0 + this.port = ((InetSocketAddress) f.channel().localAddress()).getPort(); + isOnlineFuture.set(true); + + synchronized (scenarioHandler) { + LOGGER.debug("WAITING FOR SCENARIO"); + while (! scenarioHandler.isScenarioFinished()) { + scenarioHandler.wait(); + } + } + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + } finally { + LOGGER.debug("listening client shutting down"); + try { + workerGroup.shutdownGracefully().get(); + bossGroup.shutdownGracefully().get(); + LOGGER.debug("listening client shutdown succesful"); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error(e.getMessage(), e); + } + } + scenarioDone.set(true); + } + + /** + * @return close future + */ + public Future disconnect() { + LOGGER.debug("disconnecting client"); + return workerGroup.shutdownGracefully(); + } + + @Override + public void setSecuredClient(boolean securedClient) { + this.securedClient = securedClient; + } + + @Override + public SettableFuture getIsOnlineFuture() { + return isOnlineFuture; + } + + @Override + public SettableFuture getScenarioDone() { + return scenarioDone; + } + + @Override + public void setScenarioHandler(ScenarioHandler scenario) { + this.scenarioHandler = scenario; + } + + /** + * @return actual port number + */ + public int getPort() { + return this.port; + } +} \ No newline at end of file -- 2.36.6