Refactor ServerFacade 54/111854/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 23 May 2024 22:44:24 +0000 (00:44 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 26 May 2024 20:52:19 +0000 (22:52 +0200)
ServerFacade and its two implementations have intrinsic state in their
associated groups.

The API and interactions are rather involved, including a threadpool
holding a thread, which is waiting for channel shutdown.

This completely refactors things:
- ServerFacade is an abstract class with a well-separated public and
  private APIs
- {Tcp,Udp}ServerFacade are replacements for {Tcp,Udp}Handler, which
  manage the lifecycle
- instantiation is completely asynchronous, so that we know at all times
  what we need to clean up and what is fine

Change-Id: I82952b5c3e54a9b9067f6d28bc6e9072373b146b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit ce1a49362ddb70df372798f0ed5c1fbfc39d83a4)

15 files changed:
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OnlineProvider.java [deleted file]
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ServerFacade.java
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ShutdownProvider.java [deleted file]
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java [deleted file]
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java [deleted file]
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpServerFacade.java [new file with mode: 0644]
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpHandler.java [deleted file]
openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpServerFacade.java [new file with mode: 0644]
openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java [deleted file]
openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpServerFacadeTest.java [new file with mode: 0644]
openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/UdpServerFacadeTest.java [new file with mode: 0644]
openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/SwitchConnectionProviderImpl02Test.java
openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/UdpHandlerTest.java [deleted file]
openflowjava/openflow-protocol-it/src/test/java/org/opendaylight/openflowjava/protocol/it/integration/IntegrationTest.java

diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OnlineProvider.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OnlineProvider.java
deleted file mode 100644 (file)
index 370ea16..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * OnlineProvider.
- *
- * @author mirehak
- */
-public interface OnlineProvider {
-
-    ListenableFuture<Void> getIsOnlineFuture();
-}
index 57333a375e25cc26e418464642b4c24719d6850c..c864f0a5081f686cfeb15bbe23ae51e7febf655e 100644 (file)
@@ -1,5 +1,6 @@
 /*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
+ * Copyright (c) 2013 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,12 +8,60 @@
  */
 package org.opendaylight.openflowjava.protocol.impl.core;
 
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.EventLoopGroup;
+import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+
 /**
  * Server facade interface.
- *
- * @author mirehak
  */
-public interface ServerFacade extends ShutdownProvider, OnlineProvider, Runnable {
-    @Override
-    void run();
+public abstract class ServerFacade {
+    private final @NonNull SettableFuture<Void> shutdownFuture = SettableFuture.create();
+    private final @NonNull InetSocketAddress localAddress;
+
+    @GuardedBy("this")
+    private EventLoopGroup group;
+
+    ServerFacade(final EventLoopGroup group, final InetSocketAddress localAddress) {
+        this.localAddress = requireNonNull(localAddress);
+        this.group = requireNonNull(group);
+
+        // Hook onto group shutting down -- that's when we know shutdownFuture is completed
+        group.terminationFuture().addListener(downResult -> {
+            final var cause = downResult.cause();
+            if (cause != null) {
+                shutdownFuture.setException(cause);
+            } else {
+                shutdownFuture.set(null);
+            }
+        });
+    }
+
+    /**
+     * Returns the local address.
+     *
+     * @return the local address
+     */
+    public final @NonNull InetSocketAddress localAddress() {
+        return localAddress;
+    }
+
+    /**
+     * Shuts down this facade. If this facade was already shut down, this method does nothing.
+     *
+     * @return a future completing when the facade has been shut down
+     */
+    synchronized @NonNull ListenableFuture<Void> shutdown() {
+        final var local = group;
+        if (local != null) {
+            group = null;
+            local.shutdownGracefully();
+        }
+        return shutdownFuture;
+    }
 }
diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ShutdownProvider.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/ShutdownProvider.java
deleted file mode 100644 (file)
index 34717e2..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * Shutdown provider interface.
- *
- * @author mirehak
- */
-public interface ShutdownProvider {
-
-    ListenableFuture<Void> shutdown();
-}
index d483830f4f657a321476e3d5c5144cf9b2f7a7d2..ead6e06e42ad19eda4098a0f1f61fc782ababa2e 100755 (executable)
@@ -7,19 +7,20 @@
  */
 package org.opendaylight.openflowjava.protocol.impl.core;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.epoll.Epoll;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.infrautils.diagstatus.DiagStatusService;
 import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
 import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
 import org.opendaylight.infrautils.diagstatus.ServiceState;
-import org.opendaylight.infrautils.utils.concurrent.Executors;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
@@ -73,8 +74,6 @@ import org.slf4j.LoggerFactory;
 @Component(service = SwitchConnectionProvider.class, factory = SwitchConnectionProviderImpl.FACTORY_NAME)
 public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
-
-    private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
     private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
 
     // OSGi DS Component Factory name
@@ -87,14 +86,12 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     private final SerializerRegistry serializerRegistry;
     private final DeserializerRegistry deserializerRegistry;
     private final DeserializationFactory deserializationFactory;
-    private final ListeningExecutorService listeningExecutorService;
     private final String diagStatusIdentifier;
-    private final String threadName;
 
-    private TcpConnectionInitializer connectionInitializer;
-    private ServerFacade serverFacade;
-    // FIXME: clean this up when no longer needed
-    private final ServiceRegistration diagReg;
+    @GuardedBy("this")
+    private ListenableFuture<? extends ServerFacade> serverFacade;
+    @GuardedBy("this")
+    private ServiceRegistration diagReg;
 
     public SwitchConnectionProviderImpl(final DiagStatusService diagStatus,
             final @Nullable ConnectionConfiguration connConfig) {
@@ -103,8 +100,6 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
         diagReg = diagStatus.register(diagStatusIdentifier);
 
-        threadName = THREAD_NAME_PREFIX + connectionSuffix;
-        listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
         serializerRegistry = new SerializerRegistryImpl();
         if (connConfig != null) {
             serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
@@ -124,9 +119,17 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
 
     @Override
     @Deactivate
-    public void close() {
-        shutdown();
-        diagReg.close();
+    public synchronized void close() throws InterruptedException, ExecutionException {
+        final var local = diagReg;
+        if (local != null) {
+            diagReg = null;
+            final var future = serverFacade;
+            if (future != null) {
+                shutdownFacade(future).addListener(local::close, MoreExecutors.directExecutor());
+            } else {
+                local.close();
+            }
+        }
     }
 
     // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
@@ -137,52 +140,76 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     @Override
     public ListenableFuture<Void> shutdown() {
         LOG.debug("Shutdown summoned");
-        if (serverFacade == null) {
-            LOG.warn("Can not shutdown - not configured or started");
+        final ListenableFuture<? extends ServerFacade> local;
+        synchronized (this) {
+            if (diagReg == null) {
+                return Futures.immediateVoidFuture();
+            }
+            local = serverFacade;
+        }
+        if (local == null) {
             throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
         }
-        final var serverFacadeShutdownFuture = serverFacade.shutdown();
-        Executors.shutdownAndAwaitTermination(listeningExecutorService);
-        return serverFacadeShutdownFuture;
+        return shutdownFacade(local);
+    }
+
+    private ListenableFuture<Void> shutdownFacade(final ListenableFuture<? extends ServerFacade> future) {
+        return Futures.transformAsync(future, facade -> {
+            final var shutdownFuture = facade.shutdown();
+            shutdownFuture.addListener(() -> removeFacade(future), MoreExecutors.directExecutor());
+            return shutdownFuture;
+        }, MoreExecutors.directExecutor());
+    }
+
+    private synchronized void removeFacade(final ListenableFuture<? extends ServerFacade> expected) {
+        if (expected == serverFacade) {
+            serverFacade = null;
+            diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR, "Terminated"));
+        }
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public ListenableFuture<Void> startup(final SwitchConnectionHandler connectionHandler) {
+    public synchronized ListenableFuture<Void> startup(final SwitchConnectionHandler connectionHandler) {
         LOG.debug("Startup summoned");
+
         if (connConfig == null) {
             return Futures.immediateFailedFuture(new IllegalStateException("Connection not configured"));
         }
         if (connectionHandler == null) {
             return Futures.immediateFailedFuture(new IllegalStateException("SwitchConnectionHandler is not set"));
         }
-
-        try {
-            serverFacade = createAndConfigureServer(connectionHandler);
-            Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
-                @Override
-                public void onFailure(final Throwable throwable) {
-                    diagReg.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
-                }
-
-                @Override
-                public void onSuccess(final Object result) {
-                    diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR,
-                        threadName + " terminated"));
-                }
-            }, MoreExecutors.directExecutor());
-            return serverFacade.getIsOnlineFuture();
-        } catch (RuntimeException e) {
-            return Futures.immediateFailedFuture(e);
+        if (serverFacade != null) {
+            return Futures.immediateFailedFuture(new IllegalStateException("Provider already started"));
         }
+
+        final var future = createAndConfigureServer(connectionHandler);
+        serverFacade = future;
+        Futures.addCallback(future, new FutureCallback<ServerFacade>() {
+            @Override
+            public void onSuccess(final ServerFacade result) {
+                diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL));
+                LOG.info("Started {} connection on {}", connConfig.getTransferProtocol(), result.localAddress());
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.error("Failed to start {} connection on {}:{}", connConfig.getTransferProtocol(),
+                    connConfig.getAddress(), connConfig.getPort(), cause);
+                diagReg.report(new ServiceDescriptor(diagStatusIdentifier, cause));
+            }
+        }, MoreExecutors.directExecutor());
+
+        return Futures.transform(future, facade -> null, MoreExecutors.directExecutor());
     }
 
-    private ServerFacade createAndConfigureServer(final SwitchConnectionHandler connectionHandler) {
+    private ListenableFuture<? extends ServerFacade> createAndConfigureServer(
+            final SwitchConnectionHandler connectionHandler) {
         LOG.debug("Configuring ..");
-
         final var transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
         if (transportProtocol == null) {
-            throw new IllegalStateException("No transport protocol received in " + connConfig);
+            return Futures.immediateFailedFuture(
+                new IllegalStateException("No transport protocol received in " + connConfig));
         }
 
         final var factory = new ChannelInitializerFactory();
@@ -198,29 +225,23 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         boolean isEpollEnabled = Epoll.isAvailable();
 
         return switch (transportProtocol) {
-            case TCP, TLS -> {
-                final var tcpHandler = new TcpHandler(connConfig.getAddress(), connConfig.getPort(),
-                    () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
-                final var channelInitializer = factory.createPublishingChannelInitializer();
-                tcpHandler.setChannelInitializer(channelInitializer);
-                tcpHandler.initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
-                final var workerGroupFromTcpHandler = tcpHandler.getWorkerGroup();
-                connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, channelInitializer,
-                    isEpollEnabled);
-                yield tcpHandler;
-            }
-            case UDP -> {
-                final var udpHandler = new UdpHandler(connConfig.getAddress(), connConfig.getPort(),
-                    () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
-                udpHandler.initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
-                udpHandler.setChannelInitializer(factory.createUdpChannelInitializer());
-                yield udpHandler;
-            }
+            case TCP, TLS -> TcpServerFacade.start(connConfig, isEpollEnabled,
+                factory.createPublishingChannelInitializer());
+            case UDP -> UdpServerFacade.start(connConfig, isEpollEnabled, factory.createUdpChannelInitializer());
         };
     }
 
+    @VisibleForTesting
     public ServerFacade getServerFacade() {
-        return serverFacade;
+        final ListenableFuture<? extends ServerFacade> future;
+        synchronized (this) {
+            future = serverFacade;
+        }
+        try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException("Failed to acquire facade", e);
+        }
     }
 
     @Override
@@ -347,7 +368,12 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
 
     @Override
     public void initiateConnection(final String host, final int port) {
-        connectionInitializer.initiateConnection(host, port);
+        final var facade = getServerFacade();
+        if (facade instanceof ConnectionInitializer initializer) {
+            initializer.initiateConnection(host, port);
+        } else {
+            throw new UnsupportedOperationException(facade + " does not support connections");
+        }
     }
 
     @Override
diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpConnectionInitializer.java
deleted file mode 100644 (file)
index 7272302..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Initializes (TCP) connection to device.
- *
- * @author martin.uhlir
- */
-final class TcpConnectionInitializer implements ConnectionInitializer {
-    private static final Logger LOG = LoggerFactory.getLogger(TcpConnectionInitializer.class);
-
-    private final Bootstrap bootstrap;
-
-    TcpConnectionInitializer(final EventLoopGroup workerGroup, final TcpChannelInitializer channelInitializer,
-            final boolean isEpollEnabled) {
-        bootstrap = new Bootstrap()
-            .group(requireNonNull(workerGroup, "WorkerGroup cannot be null"))
-            .handler(channelInitializer)
-            .channel(isEpollEnabled ? EpollSocketChannel.class : NioSocketChannel.class);
-    }
-
-    @Override
-    public void initiateConnection(final String host, final int port) {
-        try {
-            bootstrap.connect(host, port).sync();
-        } catch (InterruptedException e) {
-            LOG.error("Unable to initiate connection", e);
-        }
-    }
-}
diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java
deleted file mode 100644 (file)
index 97fe5ab..0000000
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.ServerSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class implementing server over TCP / TLS for handling incoming connections.
- *
- * @author michal.polkorab
- */
-public class TcpHandler implements ServerFacade {
-    /*
-     * High/low write watermarks
-     */
-    private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
-    private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
-    /*
-     * Write spin count. This tells netty to immediately retry a non-blocking
-     * write this many times before moving on to selecting.
-     */
-    private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
-
-    private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
-
-    private final SettableFuture<Void> isOnlineFuture = SettableFuture.create();
-    private final InetAddress startupAddress;
-    private final Runnable readyRunnable;
-
-    private int port;
-    private String address;
-    private EventLoopGroup workerGroup;
-    private EventLoopGroup bossGroup;
-
-    private TcpChannelInitializer channelInitializer;
-
-    private Class<? extends ServerSocketChannel> socketChannelClass;
-
-    /**
-     * Constructor of TCPHandler that listens on selected port.
-     *
-     * @param port listening port of TCPHandler server
-     */
-    public TcpHandler(final int port, final Runnable readyRunnable) {
-        this(null, port, readyRunnable);
-    }
-
-    /**
-     * Constructor of TCPHandler that listens on selected address and port.
-     * @param address listening address of TCPHandler server
-     * @param port listening port of TCPHandler server
-     */
-    public TcpHandler(final InetAddress address, final int port, final Runnable readyRunnable) {
-        this.port = port;
-        startupAddress = address;
-        this.readyRunnable = readyRunnable;
-    }
-
-    /**
-     * Starts server on selected port.
-     */
-    @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void run() {
-        /*
-         * We generally do not perform IO-unrelated tasks, so we want to have
-         * all outstanding tasks completed before the executing thread goes
-         * back into select.
-         *
-         * Any other setting means netty will measure the time it spent selecting
-         * and spend roughly proportional time executing tasks.
-         */
-        //workerGroup.setIoRatio(100);
-
-        final ChannelFuture f;
-        try {
-            ServerBootstrap bootstrap = new ServerBootstrap();
-            bootstrap.group(bossGroup, workerGroup)
-                    .channel(socketChannelClass)
-                    .handler(new LoggingHandler(LogLevel.DEBUG))
-                    .childHandler(channelInitializer)
-                    .option(ChannelOption.SO_BACKLOG, 128)
-                    .option(ChannelOption.SO_REUSEADDR, true)
-                    .childOption(ChannelOption.SO_KEEPALIVE, true)
-                    .childOption(ChannelOption.TCP_NODELAY , true)
-                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-                    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
-                            new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
-                    .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
-
-            if (startupAddress != null) {
-                f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
-            } else {
-                f = bootstrap.bind(port).sync();
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while binding port {}", port, e);
-            return;
-        } catch (Throwable throwable) {
-            // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
-            LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
-            throw throwable;
-        }
-
-        try {
-            InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
-            address = isa.getHostString();
-
-            // Update port, as it may have been specified as 0
-            port = isa.getPort();
-
-            LOG.debug("address from tcphandler: {}", address);
-            LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
-            readyRunnable.run();
-            isOnlineFuture.set(null);
-
-            // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
-            f.channel().closeFuture().sync();
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while waiting for port {} shutdown", port, e);
-        } finally {
-            shutdown();
-        }
-    }
-
-    /**
-     * Shuts down {@link TcpHandler}}.
-     */
-    @Override
-    public ListenableFuture<Void> shutdown() {
-        final var result = SettableFuture.<Void>create();
-        workerGroup.shutdownGracefully();
-        // boss will shutdown as soon, as worker is down
-        bossGroup.shutdownGracefully().addListener(downResult -> {
-            final var cause = downResult.cause();
-            if (cause != null) {
-                result.setException(cause);
-            } else {
-                result.set(null);
-            }
-        });
-        return result;
-    }
-
-    /**
-     * Returns the number of connected clients / channels.
-     *
-     * @return number of connected clients / channels
-     */
-    public int getNumberOfConnections() {
-        return channelInitializer.size();
-    }
-
-    @Override
-    public ListenableFuture<Void> getIsOnlineFuture() {
-        return isOnlineFuture;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public String getAddress() {
-        return address;
-    }
-
-    public void setChannelInitializer(final TcpChannelInitializer channelInitializer) {
-        this.channelInitializer = channelInitializer;
-    }
-
-    /**
-     * Initiate event loop groups.
-     *
-     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
-     */
-    public void initiateEventLoopGroups(final ThreadConfiguration threadConfiguration, final boolean isEpollEnabled) {
-        if (isEpollEnabled) {
-            initiateEpollEventLoopGroups(threadConfiguration);
-        } else {
-            initiateNioEventLoopGroups(threadConfiguration);
-        }
-    }
-
-    /**
-     * Initiate Nio event loop groups.
-     *
-     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
-     */
-    public void initiateNioEventLoopGroups(final ThreadConfiguration threadConfiguration) {
-        socketChannelClass = NioServerSocketChannel.class;
-        if (threadConfiguration != null) {
-            bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
-            workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
-        } else {
-            bossGroup = new NioEventLoopGroup();
-            workerGroup = new NioEventLoopGroup();
-        }
-        ((NioEventLoopGroup)workerGroup).setIoRatio(100);
-    }
-
-    /**
-     * Initiate Epoll event loop groups with Nio as fall back.
-     *
-     * @param threadConfiguration the ThreadConfiguration
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void initiateEpollEventLoopGroups(final ThreadConfiguration threadConfiguration) {
-        try {
-            socketChannelClass = EpollServerSocketChannel.class;
-            if (threadConfiguration != null) {
-                bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
-                workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
-            } else {
-                bossGroup = new EpollEventLoopGroup();
-                workerGroup = new EpollEventLoopGroup();
-            }
-            ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
-            return;
-        } catch (RuntimeException ex) {
-            LOG.debug("Epoll initiation failed");
-        }
-
-        //Fallback mechanism
-        initiateNioEventLoopGroups(threadConfiguration);
-    }
-
-    public EventLoopGroup getWorkerGroup() {
-        return workerGroup;
-    }
-}
diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpServerFacade.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpServerFacade.java
new file mode 100644 (file)
index 0000000..75ff690
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2013 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class implementing server over TCP / TLS for handling incoming connections.
+ *
+ * @author michal.polkorab
+ */
+final class TcpServerFacade extends ServerFacade implements ConnectionInitializer {
+    private static final Logger LOG = LoggerFactory.getLogger(TcpServerFacade.class);
+
+    /*
+     * High/low write watermarks
+     */
+    private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
+    private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
+    /*
+     * Write spin count. This tells Netty to immediately retry a non-blocking write this many times before moving on to
+     * selecting.
+     */
+    private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
+
+    private final TcpChannelInitializer channelInitializer;
+    private final Bootstrap bootstrap;
+
+    @GuardedBy("this")
+    private EventLoopGroup childGroup;
+
+    private TcpServerFacade(final EventLoopGroup parentGroup, final EventLoopGroup childGroup,
+            final Bootstrap bootstrap, final TcpChannelInitializer channelInitializer,
+            final InetSocketAddress localAddress) {
+        super(parentGroup, localAddress);
+        this.childGroup = requireNonNull(childGroup);
+        this.bootstrap = requireNonNull(bootstrap);
+        this.channelInitializer = requireNonNull(channelInitializer);
+
+        // Log-and-hook to prevent surprise timing
+        LOG.info("Switch listener started and ready to accept incoming TCP/TLS connections on {}", localAddress);
+    }
+
+    static ListenableFuture<TcpServerFacade> start(final ConnectionConfiguration connConfig, final boolean epollEnabled,
+            final TcpChannelInitializer channelInitializer) {
+        // Server bootstrap configuration
+        final var serverBootstrap = new ServerBootstrap()
+            .handler(new LoggingHandler(LogLevel.DEBUG))
+            .childHandler(channelInitializer)
+            .option(ChannelOption.SO_BACKLOG, 128)
+            .option(ChannelOption.SO_REUSEADDR, true)
+            .childOption(ChannelOption.SO_KEEPALIVE, true)
+            .childOption(ChannelOption.TCP_NODELAY , true)
+            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+                new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
+            .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
+
+        // Client bootstrap configuration
+        final var bootstrap = new Bootstrap().handler(channelInitializer);
+
+        /*
+         * Initialize groups.
+         *
+         * We generally do not perform IO-unrelated tasks, so we want to have all outstanding tasks completed before
+         * the executing thread goes back into select.
+         *
+         * Any other setting means Netty will measure the time it spent selecting and spend roughly proportional time
+         * executing tasks.
+         */
+        final var threadConfig = connConfig.getThreadConfiguration();
+        final var childIoRatio = 100;
+
+        // Captured by bindFuture callback below
+        final EventLoopGroup parentGroup;
+        final EventLoopGroup childGroup;
+        if (Epoll.isAvailable() && epollEnabled) {
+            // Epoll
+            serverBootstrap.channel(EpollServerSocketChannel.class);
+            bootstrap.channel(EpollSocketChannel.class);
+
+            parentGroup = new EpollEventLoopGroup(threadConfig == null ? 0 : threadConfig.getBossThreadCount());
+            final var tmp = new EpollEventLoopGroup(threadConfig == null ? 0 : threadConfig.getWorkerThreadCount());
+            tmp.setIoRatio(childIoRatio);
+            childGroup = tmp;
+        } else {
+            // NIO
+            serverBootstrap.channel(NioServerSocketChannel.class);
+            bootstrap.channel(NioSocketChannel.class);
+
+            parentGroup = threadConfig == null ? new NioEventLoopGroup()
+                : new NioEventLoopGroup(threadConfig.getBossThreadCount());
+
+            final var tmp = threadConfig == null ? new NioEventLoopGroup()
+                : new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
+            tmp.setIoRatio(childIoRatio);
+            childGroup = tmp;
+        }
+        serverBootstrap.group(parentGroup, childGroup);
+        bootstrap.group(childGroup);
+
+        // Attempt to bind the address
+        final var address = connConfig.getAddress();
+        final var port = connConfig.getPort();
+        final var bindFuture = address != null ? serverBootstrap.bind(address.getHostAddress(), port)
+            : serverBootstrap.bind(port);
+
+        // Clean up or hand off to caller
+        final var retFuture = SettableFuture.<TcpServerFacade>create();
+        bindFuture.addListener((ChannelFutureListener) future -> {
+            final var cause = future.cause();
+            if (cause != null) {
+                childGroup.shutdownGracefully();
+                parentGroup.shutdownGracefully();
+                retFuture.setException(cause);
+                return;
+            }
+
+            final var channel = future.channel();
+            final var handler = new TcpServerFacade(parentGroup, childGroup, bootstrap, channelInitializer,
+                (InetSocketAddress) channel.localAddress());
+            // Hook onto the channel's termination to initiate group shutdown
+            channel.closeFuture().addListener(closeFuture -> handler.shutdown());
+            retFuture.set(handler);
+        });
+        return retFuture;
+    }
+
+    /**
+     * Returns the number of connected clients / channels.
+     *
+     * @return number of connected clients / channels
+     */
+    public int getNumberOfConnections() {
+        return channelInitializer.size();
+    }
+
+    @Override
+    public void initiateConnection(final String host, final int port) {
+        try {
+            bootstrap.connect(host, port).sync();
+        } catch (InterruptedException e) {
+            LOG.error("Unable to initiate connection", e);
+        }
+    }
+
+    @Override
+    synchronized @NonNull ListenableFuture<Void> shutdown() {
+        final var local = childGroup;
+        if (local != null) {
+            LOG.info("Cleaning up TCP/TLS connection resources on {}", localAddress());
+            childGroup = null;
+            local.shutdownGracefully();
+        }
+        return super.shutdown();
+    }
+}
diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpHandler.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpHandler.java
deleted file mode 100644 (file)
index 472db5f..0000000
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollDatagramChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.nio.NioDatagramChannel;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class implementing server over UDP for handling incoming connections.
- *
- * @author michal.polkorab
- */
-public final class UdpHandler implements ServerFacade {
-    private static final Logger LOG = LoggerFactory.getLogger(UdpHandler.class);
-
-    private final SettableFuture<Void> isOnlineFuture = SettableFuture.create();
-    private final InetAddress startupAddress;
-    private final Runnable readyRunnable;
-
-    private int port;
-    private EventLoopGroup group;
-    private UdpChannelInitializer channelInitializer;
-    private Class<? extends DatagramChannel> datagramChannelClass;
-
-    /**
-     * Constructor of UdpHandler that listens on selected port.
-     *
-     * @param port listening port of UdpHandler server
-     */
-    public UdpHandler(final int port, final Runnable readyRunnable) {
-        this(null, port, readyRunnable);
-    }
-
-    /**
-     * Constructor of UdpHandler that listens on selected address and port.
-     * @param address listening address of UdpHandler server
-     * @param port listening port of UdpHandler server
-     */
-    public UdpHandler(final InetAddress address, final int port, final Runnable readyRunnable) {
-        this.port = port;
-        startupAddress = address;
-        this.readyRunnable = readyRunnable;
-    }
-
-    @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void run() {
-        final ChannelFuture f;
-        try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(group).channel(datagramChannelClass).option(ChannelOption.SO_BROADCAST, false)
-                .handler(channelInitializer);
-
-            if (startupAddress != null) {
-                f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
-            } else {
-                f = bootstrap.bind(port).sync();
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while binding port {}", port, e);
-            return;
-        } catch (Throwable throwable) {
-            // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
-            LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
-            throw throwable;
-        }
-
-        try {
-            InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
-            String address = isa.getHostString();
-
-            // Update port, as it may have been specified as 0
-            port = isa.getPort();
-
-            LOG.debug("Address from udpHandler: {}", address);
-            LOG.info("Switch listener started and ready to accept incoming udp connections on port: {}", port);
-            readyRunnable.run();
-            isOnlineFuture.set(null);
-
-            // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
-            f.channel().closeFuture().sync();
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while waiting for port {} shutdown", port, e);
-        } finally {
-            shutdown();
-        }
-    }
-
-    @Override
-    public ListenableFuture<Void> shutdown() {
-        final var result = SettableFuture.<Void>create();
-        group.shutdownGracefully().addListener(downResult -> {
-            final var cause = downResult.cause();
-            if (cause != null) {
-                result.setException(cause);
-            } else {
-                result.set(null);
-            }
-        });
-        return result;
-    }
-
-    @Override
-    public ListenableFuture<Void> getIsOnlineFuture() {
-        return isOnlineFuture;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setChannelInitializer(final UdpChannelInitializer channelInitializer) {
-        this.channelInitializer = channelInitializer;
-    }
-
-    /**
-     * Initiate event loop groups.
-     *
-     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
-     */
-    public void initiateEventLoopGroups(final ThreadConfiguration threadConfiguration, final boolean isEpollEnabled) {
-        if (isEpollEnabled) {
-            initiateEpollEventLoopGroups(threadConfiguration);
-        } else {
-            initiateNioEventLoopGroups(threadConfiguration);
-        }
-    }
-
-    /**
-     * Initiate Nio event loop groups.
-     *
-     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
-     */
-    public void initiateNioEventLoopGroups(final ThreadConfiguration threadConfiguration) {
-        datagramChannelClass = NioDatagramChannel.class;
-        if (threadConfiguration != null) {
-            group = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
-        } else {
-            group = new NioEventLoopGroup();
-        }
-    }
-
-    /**
-     * Initiate Epoll event loop groups with Nio as fall back.
-     *
-     * @param threadConfiguration the ThreadConfiguration
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    protected void initiateEpollEventLoopGroups(final ThreadConfiguration threadConfiguration) {
-        try {
-            datagramChannelClass = EpollDatagramChannel.class;
-            if (threadConfiguration != null) {
-                group = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
-            } else {
-                group = new EpollEventLoopGroup();
-            }
-            return;
-        } catch (RuntimeException ex) {
-            LOG.debug("Epoll initiation failed");
-        }
-
-        //Fallback mechanism
-        initiateNioEventLoopGroups(threadConfiguration);
-    }
-}
diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpServerFacade.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/UdpServerFacade.java
new file mode 100644 (file)
index 0000000..a15a617
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2014 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import java.net.InetSocketAddress;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class implementing server over UDP for handling incoming connections.
+ *
+ * @author michal.polkorab
+ */
+final class UdpServerFacade extends ServerFacade {
+    private static final Logger LOG = LoggerFactory.getLogger(UdpServerFacade.class);
+
+    private UdpServerFacade(final EventLoopGroup group, final InetSocketAddress localAddress) {
+        super(group, localAddress);
+
+        LOG.debug("Address from udpHandler: {}", localAddress);
+        LOG.info("Switch listener started and ready to accept incoming udp connections on port: {}",
+            localAddress.getPort());
+    }
+
+    static ListenableFuture<UdpServerFacade> start(final ConnectionConfiguration connConfig, final boolean epollEnabled,
+            final UdpChannelInitializer channelInitializer) {
+        // Client bootstrap configuration
+        final var bootstrap = new Bootstrap().handler(channelInitializer).option(ChannelOption.SO_BROADCAST, false);
+        final var threadConfig = connConfig.getThreadConfiguration();
+        final var threadCount = threadConfig == null ? 0 : threadConfig.getWorkerThreadCount();
+
+        // Captured by bindFuture callback below
+        final EventLoopGroup group;
+        if (Epoll.isAvailable() && epollEnabled) {
+            // Epoll
+            bootstrap.channel(EpollDatagramChannel.class);
+            group = new EpollEventLoopGroup(threadCount);
+        } else {
+            // NIO
+            bootstrap.channel(NioDatagramChannel.class);
+            group = new NioEventLoopGroup(threadCount);
+        }
+        bootstrap.group(group);
+
+        // Attempt to bind the address
+        final var address = connConfig.getAddress();
+        final var port = connConfig.getPort();
+        final var bindFuture = address != null ? bootstrap.bind(address.getHostAddress(), port) : bootstrap.bind(port);
+
+        // Clean up or hand off to caller
+        final var retFuture = SettableFuture.<UdpServerFacade>create();
+        bindFuture.addListener((ChannelFutureListener) future -> {
+            final var cause = future.cause();
+            if (cause != null) {
+                group.shutdownGracefully();
+                retFuture.setException(cause);
+            } else {
+                retFuture.set(new UdpServerFacade(group, (InetSocketAddress) future.channel().localAddress()));
+            }
+        });
+        return retFuture;
+    }
+}
diff --git a/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java b/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java
deleted file mode 100644 (file)
index 9d3f1ce..0000000
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.unix.Errors;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.concurrent.ExecutionException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
-import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
-import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
-
-/**
- * Unit tests for TcpHandler.
- *
- * @author jameshall
- */
-@RunWith(MockitoJUnitRunner.class)
-public class TcpHandlerTest {
-    private final InetAddress serverAddress = InetAddress.getLoopbackAddress();
-
-    @Mock
-    ChannelHandlerContext mockChHndlrCtx;
-    @Mock
-    TcpChannelInitializer mockChannelInitializer;
-    @Mock
-    SwitchConnectionHandler mockSwitchConnHndler;
-    @Mock
-    SerializationFactory mockSerializationFactory;
-    @Mock
-    DeserializationFactory mockDeserializationFactory;
-
-    TcpHandler tcpHandler;
-
-    /**
-     * Test run with null address set.
-     */
-    @Test
-    public void testRunWithNullAddress() throws IOException, InterruptedException, ExecutionException  {
-        tcpHandler = new TcpHandler(null, 0, () -> { });
-        tcpHandler.setChannelInitializer(mockChannelInitializer);
-
-        assertEquals("failed to start server", true, startupServer(false)) ;
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
-        shutdownServer();
-    }
-
-    /**
-     * Test run with null address set on Epoll native transport.
-     */
-    @Test
-    public void testRunWithNullAddressOnEpoll() throws IOException, InterruptedException, ExecutionException  {
-        tcpHandler = new TcpHandler(null, 0, () -> { });
-        tcpHandler.setChannelInitializer(mockChannelInitializer);
-
-        //Use Epoll native transport
-        assertEquals("failed to start server", true, startupServer(true)) ;
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
-        shutdownServer();
-    }
-
-    /**
-     * Test run with address set.
-     */
-    @Test
-    public void testRunWithAddress() throws IOException, InterruptedException, ExecutionException  {
-        tcpHandler = new TcpHandler(serverAddress, 0, () -> { });
-        tcpHandler.setChannelInitializer(mockChannelInitializer);
-
-        assertEquals("failed to start server", true, startupServer(false)) ;
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
-        shutdownServer();
-    }
-
-    /**
-     * Test run with address set on Epoll native transport.
-     */
-    @Test
-    public void testRunWithAddressOnEpoll() throws IOException, InterruptedException, ExecutionException  {
-        tcpHandler = new TcpHandler(serverAddress, 0, () -> { });
-        tcpHandler.setChannelInitializer(mockChannelInitializer);
-
-        //Use Epoll native transport
-        assertEquals("failed to start server", true, startupServer(true));
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
-        shutdownServer();
-    }
-
-    /**
-     * Test run with encryption.
-     */
-    @Test
-    public void testRunWithEncryption() throws InterruptedException, IOException, ExecutionException {
-        int serverPort = 28001;
-        tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
-        tcpHandler.setChannelInitializer(mockChannelInitializer);
-
-        assertEquals("failed to start server", true, startupServer(false));
-        assertEquals("wrong connection count", 0, tcpHandler.getNumberOfConnections());
-        assertEquals("wrong port", serverPort, tcpHandler.getPort());
-        assertEquals("wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress());
-
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
-
-        shutdownServer();
-    }
-
-    /**
-     * Test run with encryption on Epoll native transport.
-     */
-    @Test
-    public void testRunWithEncryptionOnEpoll() throws InterruptedException, IOException, ExecutionException {
-        int serverPort = 28001;
-        tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
-        tcpHandler.setChannelInitializer(mockChannelInitializer);
-
-        //Use Epoll native transport
-        assertEquals("failed to start server", true, startupServer(true));
-        assertEquals("wrong connection count", 0, tcpHandler.getNumberOfConnections());
-        assertEquals("wrong port", serverPort, tcpHandler.getPort());
-        assertEquals("wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress());
-
-        assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
-
-        shutdownServer();
-    }
-
-    /**
-     * Test run on already used port.
-     */
-    @Test(expected = BindException.class)
-    public void testSocketAlreadyInUse() throws IOException {
-        int serverPort = 28001;
-        Socket firstBinder = new Socket();
-
-        try (firstBinder) {
-            firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
-            tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
-            tcpHandler.setChannelInitializer(mockChannelInitializer);
-            tcpHandler.initiateEventLoopGroups(null, false);
-            tcpHandler.run();
-        }
-    }
-
-    /**
-     * Test run on already used port.
-     */
-    @Test
-    public void testSocketAlreadyInUseOnEpoll() throws IOException {
-        int serverPort = 28001;
-        Socket firstBinder = new Socket();
-
-        try (firstBinder) {
-            firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
-
-            tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
-            tcpHandler.setChannelInitializer(mockChannelInitializer);
-            //Use Epoll native transport
-            tcpHandler.initiateEventLoopGroups(null, true);
-            tcpHandler.run();
-            fail("Expected BindException or Errors.NativeIoException");
-        } catch (BindException | Errors.NativeIoException e) {
-            // expected
-        }
-    }
-
-    /**
-     * Trigger the server shutdown and wait 2 seconds for completion.
-     */
-    private void shutdownServer() throws InterruptedException, ExecutionException {
-        final var shutdownRet = tcpHandler.shutdown() ;
-        assertNull(shutdownRet.get());
-    }
-
-    private Boolean startupServer(final boolean isEpollEnabled) throws InterruptedException {
-        final var online = tcpHandler.getIsOnlineFuture();
-        /**
-         * Test EPoll based native transport if isEpollEnabled is true.
-         * Else use Nio based transport.
-         */
-        tcpHandler.initiateEventLoopGroups(null, isEpollEnabled);
-        new Thread(tcpHandler).start();
-        int retry = 0;
-        while (online.isDone() != true && retry++ < 20) {
-            Thread.sleep(100);
-        }
-        return online.isDone();
-    }
-
-    private static Boolean clientConnection(final int port) throws IOException {
-        // Connect, and disconnect
-        Socket socket = new Socket(InetAddress.getLoopbackAddress(), port);
-        Boolean result = socket.isConnected();
-        socket.close() ;
-        return result ;
-    }
-}
diff --git a/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpServerFacadeTest.java b/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpServerFacadeTest.java
new file mode 100644 (file)
index 0000000..23dade4
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
+import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
+import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
+
+@ExtendWith(MockitoExtension.class)
+class TcpServerFacadeTest {
+    private final InetAddress serverAddress = InetAddress.getLoopbackAddress();
+
+    @Mock
+    private ConnectionConfiguration connConfig;
+    @Mock
+    private ChannelHandlerContext mockChHndlrCtx;
+    @Mock
+    private TcpChannelInitializer mockChannelInitializer;
+    @Mock
+    private SwitchConnectionHandler mockSwitchConnHndler;
+    @Mock
+    private SerializationFactory mockSerializationFactory;
+    @Mock
+    private DeserializationFactory mockDeserializationFactory;
+
+    private TcpServerFacade tcpHandler;
+
+    @AfterEach
+    void afterEach() throws Exception {
+        if (tcpHandler != null) {
+            tcpHandler.shutdown().get(10, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Test run with null address set.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    void testRunWithNullAddress(final boolean epollEnabled) {
+        tcpHandler = assertFacade(null, 0, epollEnabled);
+        assertTrue(clientConnection(tcpHandler.localAddress().getPort())) ;
+    }
+
+    /**
+     * Test run with address set.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    void testRunWithAddress(final boolean epollEnabled) {
+        tcpHandler = assertFacade(serverAddress, 0, epollEnabled);
+        assertTrue(clientConnection(tcpHandler.localAddress().getPort())) ;
+    }
+
+    /**
+     * Test run with encryption.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    public void testRunWithEncryption(final boolean epollEnabled) {
+        final int serverPort = 28001;
+        tcpHandler = assertFacade(serverAddress, serverPort, epollEnabled);
+        assertEquals(0, tcpHandler.getNumberOfConnections());
+        assertEquals(serverPort, tcpHandler.localAddress().getPort());
+        assertEquals(serverAddress.getHostAddress(), tcpHandler.localAddress().getHostString());
+
+        assertTrue(clientConnection(serverPort));
+    }
+
+    /**
+     * Test run on already used port.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    void testSocketAlreadyInUse(final boolean epollEnabled) throws Exception {
+        final int serverPort = 28001;
+
+        try (var firstBinder = new Socket()) {
+            firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
+
+            doReturn(serverAddress).when(connConfig).getAddress();
+            doReturn(serverPort).when(connConfig).getPort();
+
+            final var future = TcpServerFacade.start(connConfig, epollEnabled, mockChannelInitializer);
+            final var cause = assertThrows(ExecutionException.class, () -> future.get(1500, TimeUnit.MILLISECONDS))
+                .getCause();
+            assertThat(cause.getMessage(), containsString("Address already in use"));
+        }
+    }
+
+    private TcpServerFacade assertFacade(final InetAddress address, final int port, final boolean epollEnabled) {
+        doReturn(address).when(connConfig).getAddress();
+        doReturn(port).when(connConfig).getPort();
+
+        final var future = TcpServerFacade.start(connConfig, epollEnabled, mockChannelInitializer);
+        try {
+            return future.get(1500, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    private static boolean clientConnection(final int port) {
+        // Connect, and disconnect
+        try (var socket = new Socket(InetAddress.getLoopbackAddress(), port)) {
+            return socket.isConnected();
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+}
diff --git a/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/UdpServerFacadeTest.java b/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/UdpServerFacadeTest.java
new file mode 100644 (file)
index 0000000..6254313
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2014 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.mockito.Mockito.doReturn;
+
+import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+
+@ExtendWith(MockitoExtension.class)
+class UdpServerFacadeTest {
+    @Mock
+    private ConnectionConfiguration connConfig;
+    @Mock
+    private UdpChannelInitializer udpChannelInitializerMock;
+    private UdpServerFacade udpHandler;
+
+    @AfterEach
+    void afterEach() throws Exception {
+        if (udpHandler != null) {
+            udpHandler.shutdown().get(10, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Test to create UdpHandler with empty address and zero port.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    void testWithEmptyAddress(final boolean epollEnabled) {
+        udpHandler = assertFacade(null, 0, epollEnabled);
+        assertNotEquals(0, udpHandler.localAddress().getPort());
+    }
+
+
+    /**
+     * Test to create UdpHandler with fill address and given port.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    void testWithAddressAndPort(final boolean epollEnabled) throws Exception {
+        final int port = 9874;
+        udpHandler = assertFacade(InetAddress.getLocalHost(), port, epollEnabled);
+        assertEquals(port, udpHandler.localAddress().getPort());
+    }
+
+    private UdpServerFacade assertFacade(final InetAddress address, final int port, final boolean epollEnabled) {
+        doReturn(address).when(connConfig).getAddress();
+        doReturn(port).when(connConfig).getPort();
+
+        final var future = UdpServerFacade.start(connConfig, epollEnabled, udpChannelInitializerMock);
+        try {
+            return future.get(1500, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new AssertionError(e);
+        }
+    }
+}
index 7c5328812262e8327a1b791e65c6f7dd28532f07..894ab2547cdab738254bf3a1fb148b76bf795ed1 100755 (executable)
@@ -9,7 +9,10 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 
 import java.net.InetAddress;
 import java.util.List;
@@ -18,6 +21,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
 import org.opendaylight.openflowjava.protocol.api.connection.TlsConfigurationImpl;
@@ -62,21 +66,38 @@ import org.opendaylight.yangtools.yang.common.Uint32;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class SwitchConnectionProviderImpl02Test {
-    @Mock DiagStatusService diagStatusService;
-    @Mock SwitchConnectionHandler handler;
-    @Mock OFGeneralSerializer serializer;
-    @Mock OFGeneralDeserializer deserializer;
-    @Mock OFDeserializer<ErrorMessage> deserializerError;
-    @Mock OFDeserializer<ExperimenterDataOfChoice> deserializerExpMsg;
-    @Mock OFDeserializer<ExperimenterDataOfChoice> deserializerMultipartRplMsg;
-    @Mock OFDeserializer<QueueProperty> deserializerQueueProperty;
-    @Mock OFDeserializer<MeterBandExperimenterCase> deserializerMeterBandExpCase;
-    @Mock OFSerializer<ExperimenterDataOfChoice> serializerExperimenterInput;
-    @Mock OFSerializer<ExperimenterDataOfChoice> serializerMultipartRequestExpCase;
-    @Mock OFSerializer<MeterBandExperimenterCase> serializerMeterBandExpCase;
-    @Mock ConnectionConfigurationImpl config;
     private static final int CHANNEL_OUTBOUND_QUEUE_SIZE = 1024;
     private static final int SWITCH_IDLE_TIMEOUT = 2000;
+
+    @Mock
+    DiagStatusService diagStatusService;
+    @Mock
+    ServiceRegistration diagReg;
+    @Mock
+    SwitchConnectionHandler handler;
+    @Mock
+    OFGeneralSerializer serializer;
+    @Mock
+    OFGeneralDeserializer deserializer;
+    @Mock
+    OFDeserializer<ErrorMessage> deserializerError;
+    @Mock
+    OFDeserializer<ExperimenterDataOfChoice> deserializerExpMsg;
+    @Mock
+    OFDeserializer<ExperimenterDataOfChoice> deserializerMultipartRplMsg;
+    @Mock
+    OFDeserializer<QueueProperty> deserializerQueueProperty;
+    @Mock
+    OFDeserializer<MeterBandExperimenterCase> deserializerMeterBandExpCase;
+    @Mock
+    OFSerializer<ExperimenterDataOfChoice> serializerExperimenterInput;
+    @Mock
+    OFSerializer<ExperimenterDataOfChoice> serializerMultipartRequestExpCase;
+    @Mock
+    OFSerializer<MeterBandExperimenterCase> serializerMeterBandExpCase;
+    @Mock
+    ConnectionConfigurationImpl config;
+
     private TlsConfiguration tlsConfiguration;
     private SwitchConnectionProviderImpl provider;
 
@@ -90,6 +111,7 @@ public class SwitchConnectionProviderImpl02Test {
         if (protocol != null) {
             createConfig(protocol);
         }
+        doReturn(diagReg).when(diagStatusService).register(any());
         provider = new SwitchConnectionProviderImpl(diagStatusService, config);
     }
 
@@ -108,7 +130,6 @@ public class SwitchConnectionProviderImpl02Test {
         config.setTransferProtocol(protocol);
     }
 
-
     /**
      * Test getServerFacade.
      */
@@ -123,10 +144,10 @@ public class SwitchConnectionProviderImpl02Test {
     /**
      * Test shutdown on unconfigured provider.
      */
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testShutdownUnconfigured() throws Exception {
         startUp(TransportProtocol.TCP);
-        provider.shutdown();
+        assertThrows(IllegalStateException.class, provider::shutdown);
     }
 
     /**
@@ -148,15 +169,14 @@ public class SwitchConnectionProviderImpl02Test {
     public void testUnregisterExistingKeys() throws Exception {
         startUp(TransportProtocol.TCP);
         // -- registerActionSerializer
-        final ExperimenterActionSerializerKey key1 =
-            new ExperimenterActionSerializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42), TestSubType.VALUE);
+        final var key1 = new ExperimenterActionSerializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            TestSubType.VALUE);
         provider.registerActionSerializer(key1, serializer);
         assertTrue("Wrong -- unregister ActionSerializer", provider.unregisterSerializer(key1));
         assertFalse("Wrong -- unregister ActionSerializer by not existing key",
                 provider.unregisterSerializer(key1));
         // -- registerActionDeserializer
-        final ExperimenterActionDeserializerKey key2
-            = new ExperimenterActionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
+        final var key2 = new ExperimenterActionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
         provider.registerActionDeserializer(key2, deserializer);
         assertTrue("Wrong -- unregister ActionDeserializer", provider.unregisterDeserializer(key2));
         assertFalse("Wrong -- unregister ActionDeserializer by not existing key",
@@ -169,35 +189,33 @@ public class SwitchConnectionProviderImpl02Test {
         assertFalse("Wrong -- unregister InstructionSerializer by not existing key",
                 provider.unregisterSerializer(key3));
         // -- registerInstructionDeserializer
-        final ExperimenterInstructionDeserializerKey key4 =
-            new ExperimenterInstructionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
+        final var key4 = new ExperimenterInstructionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
         provider.registerInstructionDeserializer(key4, deserializer);
         assertTrue("Wrong -- unregister InstructionDeserializer", provider.unregisterDeserializer(key4));
         assertFalse("Wrong -- unregister InstructionDeserializer by not existing key",
                 provider.unregisterDeserializer(key4));
         // -- registerMatchEntryDeserializer
-        final MatchEntryDeserializerKey key5 =
-            new MatchEntryDeserializerKey(EncodeConstants.OF_VERSION_1_0, 0x8000, 42);
+        final var key5 = new MatchEntryDeserializerKey(EncodeConstants.OF_VERSION_1_0, 0x8000, 42);
         provider.registerMatchEntryDeserializer(key5, deserializer);
         assertTrue("Wrong -- unregister MatchEntryDeserializer", provider.unregisterDeserializer(key5));
         assertFalse("Wrong -- unregister MatchEntryDeserializer by not existing key",
                 provider.unregisterDeserializer(key5));
         // -- registerErrorDeserializer
-        final ExperimenterIdDeserializerKey key6 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
-                Uint32.valueOf(42), ErrorMessage.class);
+        final var key6 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            ErrorMessage.class);
         provider.registerErrorDeserializer(key6, deserializerError);
         assertTrue("Wrong -- unregister ErrorDeserializer", provider.unregisterDeserializer(key6));
         assertFalse("Wrong -- unregister ErrorDeserializer by not existing key",
                 provider.unregisterDeserializer(key6));
         // -- registerExperimenterMessageDeserializer
-        final ExperimenterIdDeserializerKey key7 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
-                Uint32.valueOf(42), ExperimenterMessage.class);
+        final var key7 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            ExperimenterMessage.class);
         provider.registerExperimenterMessageDeserializer(key7, deserializerExpMsg);
         assertTrue("Wrong -- unregister ExperimenterMessageDeserializer", provider.unregisterDeserializer(key7));
         assertFalse("Wrong -- unregister ExperimenterMessageDeserializer by not existing key",
                 provider.unregisterDeserializer(key7));
         // -- registerMultipartReplyMessageDeserializer
-        final ExperimenterIdDeserializerKey key8 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
+        final var key8 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
                 Uint32.valueOf(42), MultipartReplyMessage.class);
         provider.registerMultipartReplyMessageDeserializer(key8, deserializerMultipartRplMsg);
         assertTrue("Wrong -- unregister MultipartReplyMessageDeserializer",
@@ -205,70 +223,66 @@ public class SwitchConnectionProviderImpl02Test {
         assertFalse("Wrong -- unregister MultipartReplyMessageDeserializer by not existing key",
                 provider.unregisterDeserializer(key8));
         // -- registerMultipartReplyTFDeserializer
-        final ExperimenterIdDeserializerKey key9 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
-                Uint32.valueOf(42), MultipartReplyMessage.class);
+        final var key9 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            MultipartReplyMessage.class);
         provider.registerMultipartReplyTFDeserializer(key9, deserializer);
         assertTrue("Wrong -- unregister MultipartReplyTFDeserializer", provider.unregisterDeserializer(key9));
         assertFalse("Wrong -- unregister MultipartReplyTFDeserializer by non existing key",
                 provider.unregisterDeserializer(key9));
         // -- registerQueuePropertyDeserializer
-        final ExperimenterIdDeserializerKey key10 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
-                Uint32.valueOf(42), QueueProperty.class);
+        final var key10 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            QueueProperty.class);
         provider.registerQueuePropertyDeserializer(key10, deserializerQueueProperty);
         assertTrue("Wrong -- unregister QueuePropertyDeserializer", provider.unregisterDeserializer(key10));
         assertFalse("Wrong -- unregister QueuePropertyDeserializer by not existing key",
                 provider.unregisterDeserializer(key10));
         // -- registerMeterBandDeserializer
-        final ExperimenterIdDeserializerKey key11 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
-                Uint32.valueOf(42), MeterBandExperimenterCase.class);
+        final var key11 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            MeterBandExperimenterCase.class);
         provider.registerMeterBandDeserializer(key11, deserializerMeterBandExpCase);
         assertTrue("Wrong -- unregister MeterBandDeserializer", provider.unregisterDeserializer(key11));
         assertFalse("Wrong -- unregister MeterBandDeserializer by not existing key",
                 provider.unregisterDeserializer(key11));
         // -- registerExperimenterMessageSerializer
-        ExperimenterIdSerializerKey<ExperimenterDataOfChoice> key12 =
-            new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
-                ExperimenterDataOfChoice.class);
+        final var key12 = new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            ExperimenterDataOfChoice.class);
         provider.registerExperimenterMessageSerializer(key12, serializerExperimenterInput);
         assertTrue("Wrong -- unregister ExperimenterMessageSerializer", provider.unregisterSerializer(key12));
         assertFalse("Wrong -- unregister ExperimenterMessageSerializer by not existing key",
                 provider.unregisterSerializer(key12));
         //registerMultipartRequestSerializer
-        ExperimenterIdSerializerKey<ExperimenterDataOfChoice> key13 =
-            new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
-                ExperimenterDataOfChoice.class);
+        final var key13 = new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            ExperimenterDataOfChoice.class);
         provider.registerMultipartRequestSerializer(key13, serializerMultipartRequestExpCase);
         assertTrue("Wrong -- unregister MultipartRequestSerializer", provider.unregisterSerializer(key13));
         assertFalse("Wrong -- unregister MultipartRequestSerializer by not existing key",
                 provider.unregisterSerializer(key13));
         // -- registerMultipartRequestTFSerializer
-        final ExperimenterIdSerializerKey<TableFeatureProperties> key14 =
-            new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
-                TableFeatureProperties.class);
+        final var key14 = new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+            TableFeatureProperties.class);
         provider.registerMultipartRequestTFSerializer(key14, serializer);
         assertTrue("Wrong -- unregister MultipartRequestTFSerializer", provider.unregisterSerializer(key14));
         assertFalse("Wrong -- unregister MultipartRequestTFSerializer by not existing key",
                 provider.unregisterSerializer(key14));
         // -- registerMeterBandSerializer
-        final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key15 =
-            new ExperimenterIdMeterSubTypeSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
-                MeterBandExperimenterCase.class,null);
+        final var key15 = new ExperimenterIdMeterSubTypeSerializerKey<>(EncodeConstants.OF_VERSION_1_0,
+            Uint32.valueOf(42), MeterBandExperimenterCase.class,null);
         provider.registerMeterBandSerializer(key15, serializerMeterBandExpCase);
         assertTrue("Wrong -- unregister MeterBandSerializer", provider.unregisterSerializer(key15));
         assertFalse("Wrong -- unregister MeterBandSerializer by not existing key",
                 provider.unregisterSerializer(key15));
         // -- registerMatchEntrySerializer
-        final MatchEntrySerializerKey<OpenflowBasicClass, InPort> key16 =
-            new MatchEntrySerializerKey<>(EncodeConstants.OF_VERSION_1_3, OpenflowBasicClass.VALUE, InPort.VALUE);
+        final var key16 = new MatchEntrySerializerKey<>(EncodeConstants.OF_VERSION_1_3, OpenflowBasicClass.VALUE,
+            InPort.VALUE);
         provider.registerMatchEntrySerializer(key16, serializer);
         assertTrue("Wrong -- unregister MatchEntrySerializer", provider.unregisterSerializer(key16));
         assertFalse("Wrong -- unregister MatchEntrySerializer by not existing key",
                 provider.unregisterSerializer(key15));
         // -- registerSerializer
-        final MessageTypeKey key17 = new MessageTypeKey<>(EncodeConstants.OF_VERSION_1_3, TestSubType.class);
+        final var key17 = new MessageTypeKey<>(EncodeConstants.OF_VERSION_1_3, TestSubType.class);
         provider.registerSerializer(key17, serializer);
         // -- registerDeserializer
-        final MessageCodeKey key18 = new MessageCodeKey(EncodeConstants.OF_VERSION_1_3, 42, TestSubType.class);
+        final var key18 = new MessageCodeKey(EncodeConstants.OF_VERSION_1_3, 42, TestSubType.class);
         provider.registerDeserializer(key18, deserializer);
     }
 
diff --git a/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/UdpHandlerTest.java b/openflowjava/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/connection/UdpHandlerTest.java
deleted file mode 100644 (file)
index dbcd310..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.net.InetAddress;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.openflowjava.protocol.impl.core.UdpChannelInitializer;
-import org.opendaylight.openflowjava.protocol.impl.core.UdpHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit tests for UdpHandler.
- *
- * @author madamjak
- */
-@RunWith(MockitoJUnitRunner.class)
-public class UdpHandlerTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(UdpHandlerTest.class);
-
-    @Mock
-    private UdpChannelInitializer udpChannelInitializerMock;
-    private UdpHandler udpHandler;
-
-    /**
-     * Test to create UdpHandler with empty address and zero port.
-     */
-    @Test
-    public void testWithEmptyAddress() throws Exception {
-        udpHandler = new UdpHandler(null, 0, () -> { });
-        udpHandler.setChannelInitializer(udpChannelInitializerMock);
-        assertTrue("Wrong - start server", startupServer(false));
-        udpHandler.getIsOnlineFuture().get(1500, TimeUnit.MILLISECONDS);
-        assertFalse("Wrong - port has been set to zero", udpHandler.getPort() == 0);
-        shutdownServer();
-    }
-
-    /**
-     * Test to create UdpHandler with empty address and zero port on Epoll native transport.
-     */
-    @Test
-    public void testWithEmptyAddressOnEpoll() throws Exception {
-        udpHandler = new UdpHandler(null, 0, () -> { });
-        udpHandler.setChannelInitializer(udpChannelInitializerMock);
-        assertTrue("Wrong - start server", startupServer(true));
-        udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS);
-        assertFalse("Wrong - port has been set to zero", udpHandler.getPort() == 0);
-        shutdownServer();
-    }
-
-    /**
-     * Test to create UdpHandler with fill address and given port.
-     */
-    @Test
-    public void testWithAddressAndPort() throws Exception {
-        int port = 9874;
-        udpHandler = new UdpHandler(InetAddress.getLocalHost(), port, () -> { });
-        udpHandler.setChannelInitializer(udpChannelInitializerMock);
-        assertTrue("Wrong - start server", startupServer(false));
-        udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS);
-        assertEquals("Wrong - bad port number has been set", port, udpHandler.getPort());
-        shutdownServer();
-    }
-
-    /**
-     * Test to create UdpHandler with fill address and given port on Epoll native transport.
-     */
-    @Test
-    public void testWithAddressAndPortOnEpoll() throws Exception {
-        int port = 9874;
-        udpHandler = new UdpHandler(InetAddress.getLocalHost(), port, () -> { });
-        udpHandler.setChannelInitializer(udpChannelInitializerMock);
-        assertTrue("Wrong - start server", startupServer(true));
-        udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS);
-        assertEquals("Wrong - bad port number has been set", port, udpHandler.getPort());
-        shutdownServer();
-    }
-
-    private Boolean startupServer(final boolean isEpollEnabled)
-            throws InterruptedException, ExecutionException {
-        final var online = udpHandler.getIsOnlineFuture();
-        /**
-         * Test EPoll based native transport if isEpollEnabled is true.
-         * Else use Nio based transport.
-         */
-        udpHandler.initiateEventLoopGroups(null, isEpollEnabled);
-        new Thread(udpHandler).start();
-
-        try {
-            online.get(10, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-            LOG.warn("Timeout while waiting for UDP handler to start", e);
-        }
-
-        return online.isDone();
-    }
-
-    private void shutdownServer() throws InterruptedException, ExecutionException, TimeoutException {
-        final var shutdownRet = udpHandler.shutdown() ;
-        assertNull("Wrong - shutdown failed", shutdownRet.get(10, TimeUnit.SECONDS));
-    }
-}
index a5dea958837059ff0e2987ff4f9ddf46424d0e93..75bf309f54209536a6c5f17da453cca4287e18f5 100644 (file)
@@ -41,8 +41,6 @@ import org.opendaylight.openflowjava.protocol.impl.clients.SleepEvent;
 import org.opendaylight.openflowjava.protocol.impl.clients.UdpSimpleClient;
 import org.opendaylight.openflowjava.protocol.impl.clients.WaitForMessageEvent;
 import org.opendaylight.openflowjava.protocol.impl.core.SwitchConnectionProviderImpl;
-import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler;
-import org.opendaylight.openflowjava.protocol.impl.core.UdpHandler;
 import org.opendaylight.openflowjava.protocol.impl.core.connection.ConnectionConfigurationImpl;
 import org.opendaylight.openflowjava.util.ByteBufUtils;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.KeystoreType;
@@ -109,17 +107,11 @@ public class IntegrationTest {
 
         switchConnectionProvider = new SwitchConnectionProviderImpl(diagStatusService, connConfig);
         switchConnectionProvider.startup(mockPlugin).get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-        if (protocol.equals(TransportProtocol.TCP) || protocol.equals(TransportProtocol.TLS)) {
-            final TcpHandler tcpHandler = (TcpHandler) switchConnectionProvider.getServerFacade();
-            port = tcpHandler.getPort();
-        } else {
-            final UdpHandler udpHandler = (UdpHandler) switchConnectionProvider.getServerFacade();
-            port = udpHandler.getPort();
-        }
+        port = switchConnectionProvider.getServerFacade().localAddress().getPort();
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
         switchConnectionProvider.close();
         LOGGER.debug("\n ending test -------------------------------");
     }