+++ /dev/null
-/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * OnlineProvider.
- *
- * @author mirehak
- */
-public interface OnlineProvider {
-
- ListenableFuture<Void> getIsOnlineFuture();
-}
/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
+ * Copyright (c) 2013 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
*/
package org.opendaylight.openflowjava.protocol.impl.core;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.EventLoopGroup;
+import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+
/**
* Server facade interface.
- *
- * @author mirehak
*/
-public interface ServerFacade extends ShutdownProvider, OnlineProvider, Runnable {
- @Override
- void run();
+public abstract class ServerFacade {
+ private final @NonNull SettableFuture<Void> shutdownFuture = SettableFuture.create();
+ private final @NonNull InetSocketAddress localAddress;
+
+ @GuardedBy("this")
+ private EventLoopGroup group;
+
+ ServerFacade(final EventLoopGroup group, final InetSocketAddress localAddress) {
+ this.localAddress = requireNonNull(localAddress);
+ this.group = requireNonNull(group);
+
+ // Hook onto group shutting down -- that's when we know shutdownFuture is completed
+ group.terminationFuture().addListener(downResult -> {
+ final var cause = downResult.cause();
+ if (cause != null) {
+ shutdownFuture.setException(cause);
+ } else {
+ shutdownFuture.set(null);
+ }
+ });
+ }
+
+ /**
+ * Returns the local address.
+ *
+ * @return the local address
+ */
+ public final @NonNull InetSocketAddress localAddress() {
+ return localAddress;
+ }
+
+ /**
+ * Shuts down this facade. If this facade was already shut down, this method does nothing.
+ *
+ * @return a future completing when the facade has been shut down
+ */
+ synchronized @NonNull ListenableFuture<Void> shutdown() {
+ final var local = group;
+ if (local != null) {
+ group = null;
+ local.shutdownGracefully();
+ }
+ return shutdownFuture;
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * Shutdown provider interface.
- *
- * @author mirehak
- */
-public interface ShutdownProvider {
-
- ListenableFuture<Void> shutdown();
-}
*/
package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.epoll.Epoll;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.infrautils.diagstatus.DiagStatusService;
import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
import org.opendaylight.infrautils.diagstatus.ServiceState;
-import org.opendaylight.infrautils.utils.concurrent.Executors;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
@Component(service = SwitchConnectionProvider.class, factory = SwitchConnectionProviderImpl.FACTORY_NAME)
public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
-
- private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
// OSGi DS Component Factory name
private final SerializerRegistry serializerRegistry;
private final DeserializerRegistry deserializerRegistry;
private final DeserializationFactory deserializationFactory;
- private final ListeningExecutorService listeningExecutorService;
private final String diagStatusIdentifier;
- private final String threadName;
- private TcpConnectionInitializer connectionInitializer;
- private ServerFacade serverFacade;
- // FIXME: clean this up when no longer needed
- private final ServiceRegistration diagReg;
+ @GuardedBy("this")
+ private ListenableFuture<? extends ServerFacade> serverFacade;
+ @GuardedBy("this")
+ private ServiceRegistration diagReg;
public SwitchConnectionProviderImpl(final DiagStatusService diagStatus,
final @Nullable ConnectionConfiguration connConfig) {
diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
diagReg = diagStatus.register(diagStatusIdentifier);
- threadName = THREAD_NAME_PREFIX + connectionSuffix;
- listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
serializerRegistry = new SerializerRegistryImpl();
if (connConfig != null) {
serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
@Override
@Deactivate
- public void close() {
- shutdown();
- diagReg.close();
+ public synchronized void close() throws InterruptedException, ExecutionException {
+ final var local = diagReg;
+ if (local != null) {
+ diagReg = null;
+ final var future = serverFacade;
+ if (future != null) {
+ shutdownFacade(future).addListener(local::close, MoreExecutors.directExecutor());
+ } else {
+ local.close();
+ }
+ }
}
// ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
@Override
public ListenableFuture<Void> shutdown() {
LOG.debug("Shutdown summoned");
- if (serverFacade == null) {
- LOG.warn("Can not shutdown - not configured or started");
+ final ListenableFuture<? extends ServerFacade> local;
+ synchronized (this) {
+ if (diagReg == null) {
+ return Futures.immediateVoidFuture();
+ }
+ local = serverFacade;
+ }
+ if (local == null) {
throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
}
- final var serverFacadeShutdownFuture = serverFacade.shutdown();
- Executors.shutdownAndAwaitTermination(listeningExecutorService);
- return serverFacadeShutdownFuture;
+ return shutdownFacade(local);
+ }
+
+ private ListenableFuture<Void> shutdownFacade(final ListenableFuture<? extends ServerFacade> future) {
+ return Futures.transformAsync(future, facade -> {
+ final var shutdownFuture = facade.shutdown();
+ shutdownFuture.addListener(() -> removeFacade(future), MoreExecutors.directExecutor());
+ return shutdownFuture;
+ }, MoreExecutors.directExecutor());
+ }
+
+ private synchronized void removeFacade(final ListenableFuture<? extends ServerFacade> expected) {
+ if (expected == serverFacade) {
+ serverFacade = null;
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR, "Terminated"));
+ }
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public ListenableFuture<Void> startup(final SwitchConnectionHandler connectionHandler) {
+ public synchronized ListenableFuture<Void> startup(final SwitchConnectionHandler connectionHandler) {
LOG.debug("Startup summoned");
+
if (connConfig == null) {
return Futures.immediateFailedFuture(new IllegalStateException("Connection not configured"));
}
if (connectionHandler == null) {
return Futures.immediateFailedFuture(new IllegalStateException("SwitchConnectionHandler is not set"));
}
-
- try {
- serverFacade = createAndConfigureServer(connectionHandler);
- Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
- @Override
- public void onFailure(final Throwable throwable) {
- diagReg.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
- }
-
- @Override
- public void onSuccess(final Object result) {
- diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR,
- threadName + " terminated"));
- }
- }, MoreExecutors.directExecutor());
- return serverFacade.getIsOnlineFuture();
- } catch (RuntimeException e) {
- return Futures.immediateFailedFuture(e);
+ if (serverFacade != null) {
+ return Futures.immediateFailedFuture(new IllegalStateException("Provider already started"));
}
+
+ final var future = createAndConfigureServer(connectionHandler);
+ serverFacade = future;
+ Futures.addCallback(future, new FutureCallback<ServerFacade>() {
+ @Override
+ public void onSuccess(final ServerFacade result) {
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL));
+ LOG.info("Started {} connection on {}", connConfig.getTransferProtocol(), result.localAddress());
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.error("Failed to start {} connection on {}:{}", connConfig.getTransferProtocol(),
+ connConfig.getAddress(), connConfig.getPort(), cause);
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, cause));
+ }
+ }, MoreExecutors.directExecutor());
+
+ return Futures.transform(future, facade -> null, MoreExecutors.directExecutor());
}
- private ServerFacade createAndConfigureServer(final SwitchConnectionHandler connectionHandler) {
+ private ListenableFuture<? extends ServerFacade> createAndConfigureServer(
+ final SwitchConnectionHandler connectionHandler) {
LOG.debug("Configuring ..");
-
final var transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
if (transportProtocol == null) {
- throw new IllegalStateException("No transport protocol received in " + connConfig);
+ return Futures.immediateFailedFuture(
+ new IllegalStateException("No transport protocol received in " + connConfig));
}
final var factory = new ChannelInitializerFactory();
boolean isEpollEnabled = Epoll.isAvailable();
return switch (transportProtocol) {
- case TCP, TLS -> {
- final var tcpHandler = new TcpHandler(connConfig.getAddress(), connConfig.getPort(),
- () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
- final var channelInitializer = factory.createPublishingChannelInitializer();
- tcpHandler.setChannelInitializer(channelInitializer);
- tcpHandler.initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
- final var workerGroupFromTcpHandler = tcpHandler.getWorkerGroup();
- connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, channelInitializer,
- isEpollEnabled);
- yield tcpHandler;
- }
- case UDP -> {
- final var udpHandler = new UdpHandler(connConfig.getAddress(), connConfig.getPort(),
- () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
- udpHandler.initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
- udpHandler.setChannelInitializer(factory.createUdpChannelInitializer());
- yield udpHandler;
- }
+ case TCP, TLS -> TcpServerFacade.start(connConfig, isEpollEnabled,
+ factory.createPublishingChannelInitializer());
+ case UDP -> UdpServerFacade.start(connConfig, isEpollEnabled, factory.createUdpChannelInitializer());
};
}
+ @VisibleForTesting
public ServerFacade getServerFacade() {
- return serverFacade;
+ final ListenableFuture<? extends ServerFacade> future;
+ synchronized (this) {
+ future = serverFacade;
+ }
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException("Failed to acquire facade", e);
+ }
}
@Override
@Override
public void initiateConnection(final String host, final int port) {
- connectionInitializer.initiateConnection(host, port);
+ final var facade = getServerFacade();
+ if (facade instanceof ConnectionInitializer initializer) {
+ initializer.initiateConnection(host, port);
+ } else {
+ throw new UnsupportedOperationException(facade + " does not support connections");
+ }
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Initializes (TCP) connection to device.
- *
- * @author martin.uhlir
- */
-final class TcpConnectionInitializer implements ConnectionInitializer {
- private static final Logger LOG = LoggerFactory.getLogger(TcpConnectionInitializer.class);
-
- private final Bootstrap bootstrap;
-
- TcpConnectionInitializer(final EventLoopGroup workerGroup, final TcpChannelInitializer channelInitializer,
- final boolean isEpollEnabled) {
- bootstrap = new Bootstrap()
- .group(requireNonNull(workerGroup, "WorkerGroup cannot be null"))
- .handler(channelInitializer)
- .channel(isEpollEnabled ? EpollSocketChannel.class : NioSocketChannel.class);
- }
-
- @Override
- public void initiateConnection(final String host, final int port) {
- try {
- bootstrap.connect(host, port).sync();
- } catch (InterruptedException e) {
- LOG.error("Unable to initiate connection", e);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.ServerSocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class implementing server over TCP / TLS for handling incoming connections.
- *
- * @author michal.polkorab
- */
-public class TcpHandler implements ServerFacade {
- /*
- * High/low write watermarks
- */
- private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
- private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
- /*
- * Write spin count. This tells netty to immediately retry a non-blocking
- * write this many times before moving on to selecting.
- */
- private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
-
- private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
-
- private final SettableFuture<Void> isOnlineFuture = SettableFuture.create();
- private final InetAddress startupAddress;
- private final Runnable readyRunnable;
-
- private int port;
- private String address;
- private EventLoopGroup workerGroup;
- private EventLoopGroup bossGroup;
-
- private TcpChannelInitializer channelInitializer;
-
- private Class<? extends ServerSocketChannel> socketChannelClass;
-
- /**
- * Constructor of TCPHandler that listens on selected port.
- *
- * @param port listening port of TCPHandler server
- */
- public TcpHandler(final int port, final Runnable readyRunnable) {
- this(null, port, readyRunnable);
- }
-
- /**
- * Constructor of TCPHandler that listens on selected address and port.
- * @param address listening address of TCPHandler server
- * @param port listening port of TCPHandler server
- */
- public TcpHandler(final InetAddress address, final int port, final Runnable readyRunnable) {
- this.port = port;
- startupAddress = address;
- this.readyRunnable = readyRunnable;
- }
-
- /**
- * Starts server on selected port.
- */
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void run() {
- /*
- * We generally do not perform IO-unrelated tasks, so we want to have
- * all outstanding tasks completed before the executing thread goes
- * back into select.
- *
- * Any other setting means netty will measure the time it spent selecting
- * and spend roughly proportional time executing tasks.
- */
- //workerGroup.setIoRatio(100);
-
- final ChannelFuture f;
- try {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup, workerGroup)
- .channel(socketChannelClass)
- .handler(new LoggingHandler(LogLevel.DEBUG))
- .childHandler(channelInitializer)
- .option(ChannelOption.SO_BACKLOG, 128)
- .option(ChannelOption.SO_REUSEADDR, true)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.TCP_NODELAY , true)
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
- .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
-
- if (startupAddress != null) {
- f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
- } else {
- f = bootstrap.bind(port).sync();
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted while binding port {}", port, e);
- return;
- } catch (Throwable throwable) {
- // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
- LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
- throw throwable;
- }
-
- try {
- InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
- address = isa.getHostString();
-
- // Update port, as it may have been specified as 0
- port = isa.getPort();
-
- LOG.debug("address from tcphandler: {}", address);
- LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
- readyRunnable.run();
- isOnlineFuture.set(null);
-
- // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
- f.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for port {} shutdown", port, e);
- } finally {
- shutdown();
- }
- }
-
- /**
- * Shuts down {@link TcpHandler}}.
- */
- @Override
- public ListenableFuture<Void> shutdown() {
- final var result = SettableFuture.<Void>create();
- workerGroup.shutdownGracefully();
- // boss will shutdown as soon, as worker is down
- bossGroup.shutdownGracefully().addListener(downResult -> {
- final var cause = downResult.cause();
- if (cause != null) {
- result.setException(cause);
- } else {
- result.set(null);
- }
- });
- return result;
- }
-
- /**
- * Returns the number of connected clients / channels.
- *
- * @return number of connected clients / channels
- */
- public int getNumberOfConnections() {
- return channelInitializer.size();
- }
-
- @Override
- public ListenableFuture<Void> getIsOnlineFuture() {
- return isOnlineFuture;
- }
-
- public int getPort() {
- return port;
- }
-
- public String getAddress() {
- return address;
- }
-
- public void setChannelInitializer(final TcpChannelInitializer channelInitializer) {
- this.channelInitializer = channelInitializer;
- }
-
- /**
- * Initiate event loop groups.
- *
- * @param threadConfiguration number of threads to be created, if not specified in threadConfig
- */
- public void initiateEventLoopGroups(final ThreadConfiguration threadConfiguration, final boolean isEpollEnabled) {
- if (isEpollEnabled) {
- initiateEpollEventLoopGroups(threadConfiguration);
- } else {
- initiateNioEventLoopGroups(threadConfiguration);
- }
- }
-
- /**
- * Initiate Nio event loop groups.
- *
- * @param threadConfiguration number of threads to be created, if not specified in threadConfig
- */
- public void initiateNioEventLoopGroups(final ThreadConfiguration threadConfiguration) {
- socketChannelClass = NioServerSocketChannel.class;
- if (threadConfiguration != null) {
- bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
- workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
- } else {
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
- }
- ((NioEventLoopGroup)workerGroup).setIoRatio(100);
- }
-
- /**
- * Initiate Epoll event loop groups with Nio as fall back.
- *
- * @param threadConfiguration the ThreadConfiguration
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- protected void initiateEpollEventLoopGroups(final ThreadConfiguration threadConfiguration) {
- try {
- socketChannelClass = EpollServerSocketChannel.class;
- if (threadConfiguration != null) {
- bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
- workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
- } else {
- bossGroup = new EpollEventLoopGroup();
- workerGroup = new EpollEventLoopGroup();
- }
- ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
- return;
- } catch (RuntimeException ex) {
- LOG.debug("Epoll initiation failed");
- }
-
- //Fallback mechanism
- initiateNioEventLoopGroups(threadConfiguration);
- }
-
- public EventLoopGroup getWorkerGroup() {
- return workerGroup;
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2013 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class implementing server over TCP / TLS for handling incoming connections.
+ *
+ * @author michal.polkorab
+ */
+final class TcpServerFacade extends ServerFacade implements ConnectionInitializer {
+ private static final Logger LOG = LoggerFactory.getLogger(TcpServerFacade.class);
+
+ /*
+ * High/low write watermarks
+ */
+ private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
+ private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
+ /*
+ * Write spin count. This tells Netty to immediately retry a non-blocking write this many times before moving on to
+ * selecting.
+ */
+ private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
+
+ private final TcpChannelInitializer channelInitializer;
+ private final Bootstrap bootstrap;
+
+ @GuardedBy("this")
+ private EventLoopGroup childGroup;
+
+ private TcpServerFacade(final EventLoopGroup parentGroup, final EventLoopGroup childGroup,
+ final Bootstrap bootstrap, final TcpChannelInitializer channelInitializer,
+ final InetSocketAddress localAddress) {
+ super(parentGroup, localAddress);
+ this.childGroup = requireNonNull(childGroup);
+ this.bootstrap = requireNonNull(bootstrap);
+ this.channelInitializer = requireNonNull(channelInitializer);
+
+ // Log-and-hook to prevent surprise timing
+ LOG.info("Switch listener started and ready to accept incoming TCP/TLS connections on {}", localAddress);
+ }
+
+ static ListenableFuture<TcpServerFacade> start(final ConnectionConfiguration connConfig, final boolean epollEnabled,
+ final TcpChannelInitializer channelInitializer) {
+ // Server bootstrap configuration
+ final var serverBootstrap = new ServerBootstrap()
+ .handler(new LoggingHandler(LogLevel.DEBUG))
+ .childHandler(channelInitializer)
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.TCP_NODELAY , true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+ new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
+ .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
+
+ // Client bootstrap configuration
+ final var bootstrap = new Bootstrap().handler(channelInitializer);
+
+ /*
+ * Initialize groups.
+ *
+ * We generally do not perform IO-unrelated tasks, so we want to have all outstanding tasks completed before
+ * the executing thread goes back into select.
+ *
+ * Any other setting means Netty will measure the time it spent selecting and spend roughly proportional time
+ * executing tasks.
+ */
+ final var threadConfig = connConfig.getThreadConfiguration();
+ final var childIoRatio = 100;
+
+ // Captured by bindFuture callback below
+ final EventLoopGroup parentGroup;
+ final EventLoopGroup childGroup;
+ if (Epoll.isAvailable() && epollEnabled) {
+ // Epoll
+ serverBootstrap.channel(EpollServerSocketChannel.class);
+ bootstrap.channel(EpollSocketChannel.class);
+
+ parentGroup = new EpollEventLoopGroup(threadConfig == null ? 0 : threadConfig.getBossThreadCount());
+ final var tmp = new EpollEventLoopGroup(threadConfig == null ? 0 : threadConfig.getWorkerThreadCount());
+ tmp.setIoRatio(childIoRatio);
+ childGroup = tmp;
+ } else {
+ // NIO
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ bootstrap.channel(NioSocketChannel.class);
+
+ parentGroup = threadConfig == null ? new NioEventLoopGroup()
+ : new NioEventLoopGroup(threadConfig.getBossThreadCount());
+
+ final var tmp = threadConfig == null ? new NioEventLoopGroup()
+ : new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
+ tmp.setIoRatio(childIoRatio);
+ childGroup = tmp;
+ }
+ serverBootstrap.group(parentGroup, childGroup);
+ bootstrap.group(childGroup);
+
+ // Attempt to bind the address
+ final var address = connConfig.getAddress();
+ final var port = connConfig.getPort();
+ final var bindFuture = address != null ? serverBootstrap.bind(address.getHostAddress(), port)
+ : serverBootstrap.bind(port);
+
+ // Clean up or hand off to caller
+ final var retFuture = SettableFuture.<TcpServerFacade>create();
+ bindFuture.addListener((ChannelFutureListener) future -> {
+ final var cause = future.cause();
+ if (cause != null) {
+ childGroup.shutdownGracefully();
+ parentGroup.shutdownGracefully();
+ retFuture.setException(cause);
+ return;
+ }
+
+ final var channel = future.channel();
+ final var handler = new TcpServerFacade(parentGroup, childGroup, bootstrap, channelInitializer,
+ (InetSocketAddress) channel.localAddress());
+ // Hook onto the channel's termination to initiate group shutdown
+ channel.closeFuture().addListener(closeFuture -> handler.shutdown());
+ retFuture.set(handler);
+ });
+ return retFuture;
+ }
+
+ /**
+ * Returns the number of connected clients / channels.
+ *
+ * @return number of connected clients / channels
+ */
+ public int getNumberOfConnections() {
+ return channelInitializer.size();
+ }
+
+ @Override
+ public void initiateConnection(final String host, final int port) {
+ try {
+ bootstrap.connect(host, port).sync();
+ } catch (InterruptedException e) {
+ LOG.error("Unable to initiate connection", e);
+ }
+ }
+
+ @Override
+ synchronized @NonNull ListenableFuture<Void> shutdown() {
+ final var local = childGroup;
+ if (local != null) {
+ LOG.info("Cleaning up TCP/TLS connection resources on {}", localAddress());
+ childGroup = null;
+ local.shutdownGracefully();
+ }
+ return super.shutdown();
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollDatagramChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.DatagramChannel;
-import io.netty.channel.socket.nio.NioDatagramChannel;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class implementing server over UDP for handling incoming connections.
- *
- * @author michal.polkorab
- */
-public final class UdpHandler implements ServerFacade {
- private static final Logger LOG = LoggerFactory.getLogger(UdpHandler.class);
-
- private final SettableFuture<Void> isOnlineFuture = SettableFuture.create();
- private final InetAddress startupAddress;
- private final Runnable readyRunnable;
-
- private int port;
- private EventLoopGroup group;
- private UdpChannelInitializer channelInitializer;
- private Class<? extends DatagramChannel> datagramChannelClass;
-
- /**
- * Constructor of UdpHandler that listens on selected port.
- *
- * @param port listening port of UdpHandler server
- */
- public UdpHandler(final int port, final Runnable readyRunnable) {
- this(null, port, readyRunnable);
- }
-
- /**
- * Constructor of UdpHandler that listens on selected address and port.
- * @param address listening address of UdpHandler server
- * @param port listening port of UdpHandler server
- */
- public UdpHandler(final InetAddress address, final int port, final Runnable readyRunnable) {
- this.port = port;
- startupAddress = address;
- this.readyRunnable = readyRunnable;
- }
-
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void run() {
- final ChannelFuture f;
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group).channel(datagramChannelClass).option(ChannelOption.SO_BROADCAST, false)
- .handler(channelInitializer);
-
- if (startupAddress != null) {
- f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
- } else {
- f = bootstrap.bind(port).sync();
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted while binding port {}", port, e);
- return;
- } catch (Throwable throwable) {
- // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
- LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
- throw throwable;
- }
-
- try {
- InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
- String address = isa.getHostString();
-
- // Update port, as it may have been specified as 0
- port = isa.getPort();
-
- LOG.debug("Address from udpHandler: {}", address);
- LOG.info("Switch listener started and ready to accept incoming udp connections on port: {}", port);
- readyRunnable.run();
- isOnlineFuture.set(null);
-
- // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
- f.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for port {} shutdown", port, e);
- } finally {
- shutdown();
- }
- }
-
- @Override
- public ListenableFuture<Void> shutdown() {
- final var result = SettableFuture.<Void>create();
- group.shutdownGracefully().addListener(downResult -> {
- final var cause = downResult.cause();
- if (cause != null) {
- result.setException(cause);
- } else {
- result.set(null);
- }
- });
- return result;
- }
-
- @Override
- public ListenableFuture<Void> getIsOnlineFuture() {
- return isOnlineFuture;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setChannelInitializer(final UdpChannelInitializer channelInitializer) {
- this.channelInitializer = channelInitializer;
- }
-
- /**
- * Initiate event loop groups.
- *
- * @param threadConfiguration number of threads to be created, if not specified in threadConfig
- */
- public void initiateEventLoopGroups(final ThreadConfiguration threadConfiguration, final boolean isEpollEnabled) {
- if (isEpollEnabled) {
- initiateEpollEventLoopGroups(threadConfiguration);
- } else {
- initiateNioEventLoopGroups(threadConfiguration);
- }
- }
-
- /**
- * Initiate Nio event loop groups.
- *
- * @param threadConfiguration number of threads to be created, if not specified in threadConfig
- */
- public void initiateNioEventLoopGroups(final ThreadConfiguration threadConfiguration) {
- datagramChannelClass = NioDatagramChannel.class;
- if (threadConfiguration != null) {
- group = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
- } else {
- group = new NioEventLoopGroup();
- }
- }
-
- /**
- * Initiate Epoll event loop groups with Nio as fall back.
- *
- * @param threadConfiguration the ThreadConfiguration
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- protected void initiateEpollEventLoopGroups(final ThreadConfiguration threadConfiguration) {
- try {
- datagramChannelClass = EpollDatagramChannel.class;
- if (threadConfiguration != null) {
- group = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
- } else {
- group = new EpollEventLoopGroup();
- }
- return;
- } catch (RuntimeException ex) {
- LOG.debug("Epoll initiation failed");
- }
-
- //Fallback mechanism
- initiateNioEventLoopGroups(threadConfiguration);
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import java.net.InetSocketAddress;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class implementing server over UDP for handling incoming connections.
+ *
+ * @author michal.polkorab
+ */
+final class UdpServerFacade extends ServerFacade {
+ private static final Logger LOG = LoggerFactory.getLogger(UdpServerFacade.class);
+
+ private UdpServerFacade(final EventLoopGroup group, final InetSocketAddress localAddress) {
+ super(group, localAddress);
+
+ LOG.debug("Address from udpHandler: {}", localAddress);
+ LOG.info("Switch listener started and ready to accept incoming udp connections on port: {}",
+ localAddress.getPort());
+ }
+
+ static ListenableFuture<UdpServerFacade> start(final ConnectionConfiguration connConfig, final boolean epollEnabled,
+ final UdpChannelInitializer channelInitializer) {
+ // Client bootstrap configuration
+ final var bootstrap = new Bootstrap().handler(channelInitializer).option(ChannelOption.SO_BROADCAST, false);
+ final var threadConfig = connConfig.getThreadConfiguration();
+ final var threadCount = threadConfig == null ? 0 : threadConfig.getWorkerThreadCount();
+
+ // Captured by bindFuture callback below
+ final EventLoopGroup group;
+ if (Epoll.isAvailable() && epollEnabled) {
+ // Epoll
+ bootstrap.channel(EpollDatagramChannel.class);
+ group = new EpollEventLoopGroup(threadCount);
+ } else {
+ // NIO
+ bootstrap.channel(NioDatagramChannel.class);
+ group = new NioEventLoopGroup(threadCount);
+ }
+ bootstrap.group(group);
+
+ // Attempt to bind the address
+ final var address = connConfig.getAddress();
+ final var port = connConfig.getPort();
+ final var bindFuture = address != null ? bootstrap.bind(address.getHostAddress(), port) : bootstrap.bind(port);
+
+ // Clean up or hand off to caller
+ final var retFuture = SettableFuture.<UdpServerFacade>create();
+ bindFuture.addListener((ChannelFutureListener) future -> {
+ final var cause = future.cause();
+ if (cause != null) {
+ group.shutdownGracefully();
+ retFuture.setException(cause);
+ } else {
+ retFuture.set(new UdpServerFacade(group, (InetSocketAddress) future.channel().localAddress()));
+ }
+ });
+ return retFuture;
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.unix.Errors;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.concurrent.ExecutionException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
-import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
-import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
-
-/**
- * Unit tests for TcpHandler.
- *
- * @author jameshall
- */
-@RunWith(MockitoJUnitRunner.class)
-public class TcpHandlerTest {
- private final InetAddress serverAddress = InetAddress.getLoopbackAddress();
-
- @Mock
- ChannelHandlerContext mockChHndlrCtx;
- @Mock
- TcpChannelInitializer mockChannelInitializer;
- @Mock
- SwitchConnectionHandler mockSwitchConnHndler;
- @Mock
- SerializationFactory mockSerializationFactory;
- @Mock
- DeserializationFactory mockDeserializationFactory;
-
- TcpHandler tcpHandler;
-
- /**
- * Test run with null address set.
- */
- @Test
- public void testRunWithNullAddress() throws IOException, InterruptedException, ExecutionException {
- tcpHandler = new TcpHandler(null, 0, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
-
- assertEquals("failed to start server", true, startupServer(false)) ;
- assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
- shutdownServer();
- }
-
- /**
- * Test run with null address set on Epoll native transport.
- */
- @Test
- public void testRunWithNullAddressOnEpoll() throws IOException, InterruptedException, ExecutionException {
- tcpHandler = new TcpHandler(null, 0, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
-
- //Use Epoll native transport
- assertEquals("failed to start server", true, startupServer(true)) ;
- assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
- shutdownServer();
- }
-
- /**
- * Test run with address set.
- */
- @Test
- public void testRunWithAddress() throws IOException, InterruptedException, ExecutionException {
- tcpHandler = new TcpHandler(serverAddress, 0, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
-
- assertEquals("failed to start server", true, startupServer(false)) ;
- assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort())) ;
- shutdownServer();
- }
-
- /**
- * Test run with address set on Epoll native transport.
- */
- @Test
- public void testRunWithAddressOnEpoll() throws IOException, InterruptedException, ExecutionException {
- tcpHandler = new TcpHandler(serverAddress, 0, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
-
- //Use Epoll native transport
- assertEquals("failed to start server", true, startupServer(true));
- assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
- shutdownServer();
- }
-
- /**
- * Test run with encryption.
- */
- @Test
- public void testRunWithEncryption() throws InterruptedException, IOException, ExecutionException {
- int serverPort = 28001;
- tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
-
- assertEquals("failed to start server", true, startupServer(false));
- assertEquals("wrong connection count", 0, tcpHandler.getNumberOfConnections());
- assertEquals("wrong port", serverPort, tcpHandler.getPort());
- assertEquals("wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress());
-
- assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
-
- shutdownServer();
- }
-
- /**
- * Test run with encryption on Epoll native transport.
- */
- @Test
- public void testRunWithEncryptionOnEpoll() throws InterruptedException, IOException, ExecutionException {
- int serverPort = 28001;
- tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
-
- //Use Epoll native transport
- assertEquals("failed to start server", true, startupServer(true));
- assertEquals("wrong connection count", 0, tcpHandler.getNumberOfConnections());
- assertEquals("wrong port", serverPort, tcpHandler.getPort());
- assertEquals("wrong address", serverAddress.getHostAddress(), tcpHandler.getAddress());
-
- assertEquals("failed to connect client", true, clientConnection(tcpHandler.getPort()));
-
- shutdownServer();
- }
-
- /**
- * Test run on already used port.
- */
- @Test(expected = BindException.class)
- public void testSocketAlreadyInUse() throws IOException {
- int serverPort = 28001;
- Socket firstBinder = new Socket();
-
- try (firstBinder) {
- firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
- tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
- tcpHandler.initiateEventLoopGroups(null, false);
- tcpHandler.run();
- }
- }
-
- /**
- * Test run on already used port.
- */
- @Test
- public void testSocketAlreadyInUseOnEpoll() throws IOException {
- int serverPort = 28001;
- Socket firstBinder = new Socket();
-
- try (firstBinder) {
- firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
-
- tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
- tcpHandler.setChannelInitializer(mockChannelInitializer);
- //Use Epoll native transport
- tcpHandler.initiateEventLoopGroups(null, true);
- tcpHandler.run();
- fail("Expected BindException or Errors.NativeIoException");
- } catch (BindException | Errors.NativeIoException e) {
- // expected
- }
- }
-
- /**
- * Trigger the server shutdown and wait 2 seconds for completion.
- */
- private void shutdownServer() throws InterruptedException, ExecutionException {
- final var shutdownRet = tcpHandler.shutdown() ;
- assertNull(shutdownRet.get());
- }
-
- private Boolean startupServer(final boolean isEpollEnabled) throws InterruptedException {
- final var online = tcpHandler.getIsOnlineFuture();
- /**
- * Test EPoll based native transport if isEpollEnabled is true.
- * Else use Nio based transport.
- */
- tcpHandler.initiateEventLoopGroups(null, isEpollEnabled);
- new Thread(tcpHandler).start();
- int retry = 0;
- while (online.isDone() != true && retry++ < 20) {
- Thread.sleep(100);
- }
- return online.isDone();
- }
-
- private static Boolean clientConnection(final int port) throws IOException {
- // Connect, and disconnect
- Socket socket = new Socket(InetAddress.getLoopbackAddress(), port);
- Boolean result = socket.isConnected();
- socket.close() ;
- return result ;
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
+import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
+import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
+
+@ExtendWith(MockitoExtension.class)
+class TcpServerFacadeTest {
+ private final InetAddress serverAddress = InetAddress.getLoopbackAddress();
+
+ @Mock
+ private ConnectionConfiguration connConfig;
+ @Mock
+ private ChannelHandlerContext mockChHndlrCtx;
+ @Mock
+ private TcpChannelInitializer mockChannelInitializer;
+ @Mock
+ private SwitchConnectionHandler mockSwitchConnHndler;
+ @Mock
+ private SerializationFactory mockSerializationFactory;
+ @Mock
+ private DeserializationFactory mockDeserializationFactory;
+
+ private TcpServerFacade tcpHandler;
+
+ @AfterEach
+ void afterEach() throws Exception {
+ if (tcpHandler != null) {
+ tcpHandler.shutdown().get(10, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Test run with null address set.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ void testRunWithNullAddress(final boolean epollEnabled) {
+ tcpHandler = assertFacade(null, 0, epollEnabled);
+ assertTrue(clientConnection(tcpHandler.localAddress().getPort())) ;
+ }
+
+ /**
+ * Test run with address set.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ void testRunWithAddress(final boolean epollEnabled) {
+ tcpHandler = assertFacade(serverAddress, 0, epollEnabled);
+ assertTrue(clientConnection(tcpHandler.localAddress().getPort())) ;
+ }
+
+ /**
+ * Test run with encryption.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ public void testRunWithEncryption(final boolean epollEnabled) {
+ final int serverPort = 28001;
+ tcpHandler = assertFacade(serverAddress, serverPort, epollEnabled);
+ assertEquals(0, tcpHandler.getNumberOfConnections());
+ assertEquals(serverPort, tcpHandler.localAddress().getPort());
+ assertEquals(serverAddress.getHostAddress(), tcpHandler.localAddress().getHostString());
+
+ assertTrue(clientConnection(serverPort));
+ }
+
+ /**
+ * Test run on already used port.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ void testSocketAlreadyInUse(final boolean epollEnabled) throws Exception {
+ final int serverPort = 28001;
+
+ try (var firstBinder = new Socket()) {
+ firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
+
+ doReturn(serverAddress).when(connConfig).getAddress();
+ doReturn(serverPort).when(connConfig).getPort();
+
+ final var future = TcpServerFacade.start(connConfig, epollEnabled, mockChannelInitializer);
+ final var cause = assertThrows(ExecutionException.class, () -> future.get(1500, TimeUnit.MILLISECONDS))
+ .getCause();
+ assertThat(cause.getMessage(), containsString("Address already in use"));
+ }
+ }
+
+ private TcpServerFacade assertFacade(final InetAddress address, final int port, final boolean epollEnabled) {
+ doReturn(address).when(connConfig).getAddress();
+ doReturn(port).when(connConfig).getPort();
+
+ final var future = TcpServerFacade.start(connConfig, epollEnabled, mockChannelInitializer);
+ try {
+ return future.get(1500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ private static boolean clientConnection(final int port) {
+ // Connect, and disconnect
+ try (var socket = new Socket(InetAddress.getLoopbackAddress(), port)) {
+ return socket.isConnected();
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.mockito.Mockito.doReturn;
+
+import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
+
+@ExtendWith(MockitoExtension.class)
+class UdpServerFacadeTest {
+ @Mock
+ private ConnectionConfiguration connConfig;
+ @Mock
+ private UdpChannelInitializer udpChannelInitializerMock;
+ private UdpServerFacade udpHandler;
+
+ @AfterEach
+ void afterEach() throws Exception {
+ if (udpHandler != null) {
+ udpHandler.shutdown().get(10, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Test to create UdpHandler with empty address and zero port.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ void testWithEmptyAddress(final boolean epollEnabled) {
+ udpHandler = assertFacade(null, 0, epollEnabled);
+ assertNotEquals(0, udpHandler.localAddress().getPort());
+ }
+
+
+ /**
+ * Test to create UdpHandler with fill address and given port.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ void testWithAddressAndPort(final boolean epollEnabled) throws Exception {
+ final int port = 9874;
+ udpHandler = assertFacade(InetAddress.getLocalHost(), port, epollEnabled);
+ assertEquals(port, udpHandler.localAddress().getPort());
+ }
+
+ private UdpServerFacade assertFacade(final InetAddress address, final int port, final boolean epollEnabled) {
+ doReturn(address).when(connConfig).getAddress();
+ doReturn(port).when(connConfig).getPort();
+
+ final var future = UdpServerFacade.start(connConfig, epollEnabled, udpChannelInitializerMock);
+ try {
+ return future.get(1500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new AssertionError(e);
+ }
+ }
+}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import java.net.InetAddress;
import java.util.List;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfigurationImpl;
*/
@RunWith(MockitoJUnitRunner.class)
public class SwitchConnectionProviderImpl02Test {
- @Mock DiagStatusService diagStatusService;
- @Mock SwitchConnectionHandler handler;
- @Mock OFGeneralSerializer serializer;
- @Mock OFGeneralDeserializer deserializer;
- @Mock OFDeserializer<ErrorMessage> deserializerError;
- @Mock OFDeserializer<ExperimenterDataOfChoice> deserializerExpMsg;
- @Mock OFDeserializer<ExperimenterDataOfChoice> deserializerMultipartRplMsg;
- @Mock OFDeserializer<QueueProperty> deserializerQueueProperty;
- @Mock OFDeserializer<MeterBandExperimenterCase> deserializerMeterBandExpCase;
- @Mock OFSerializer<ExperimenterDataOfChoice> serializerExperimenterInput;
- @Mock OFSerializer<ExperimenterDataOfChoice> serializerMultipartRequestExpCase;
- @Mock OFSerializer<MeterBandExperimenterCase> serializerMeterBandExpCase;
- @Mock ConnectionConfigurationImpl config;
private static final int CHANNEL_OUTBOUND_QUEUE_SIZE = 1024;
private static final int SWITCH_IDLE_TIMEOUT = 2000;
+
+ @Mock
+ DiagStatusService diagStatusService;
+ @Mock
+ ServiceRegistration diagReg;
+ @Mock
+ SwitchConnectionHandler handler;
+ @Mock
+ OFGeneralSerializer serializer;
+ @Mock
+ OFGeneralDeserializer deserializer;
+ @Mock
+ OFDeserializer<ErrorMessage> deserializerError;
+ @Mock
+ OFDeserializer<ExperimenterDataOfChoice> deserializerExpMsg;
+ @Mock
+ OFDeserializer<ExperimenterDataOfChoice> deserializerMultipartRplMsg;
+ @Mock
+ OFDeserializer<QueueProperty> deserializerQueueProperty;
+ @Mock
+ OFDeserializer<MeterBandExperimenterCase> deserializerMeterBandExpCase;
+ @Mock
+ OFSerializer<ExperimenterDataOfChoice> serializerExperimenterInput;
+ @Mock
+ OFSerializer<ExperimenterDataOfChoice> serializerMultipartRequestExpCase;
+ @Mock
+ OFSerializer<MeterBandExperimenterCase> serializerMeterBandExpCase;
+ @Mock
+ ConnectionConfigurationImpl config;
+
private TlsConfiguration tlsConfiguration;
private SwitchConnectionProviderImpl provider;
if (protocol != null) {
createConfig(protocol);
}
+ doReturn(diagReg).when(diagStatusService).register(any());
provider = new SwitchConnectionProviderImpl(diagStatusService, config);
}
config.setTransferProtocol(protocol);
}
-
/**
* Test getServerFacade.
*/
/**
* Test shutdown on unconfigured provider.
*/
- @Test(expected = IllegalStateException.class)
+ @Test
public void testShutdownUnconfigured() throws Exception {
startUp(TransportProtocol.TCP);
- provider.shutdown();
+ assertThrows(IllegalStateException.class, provider::shutdown);
}
/**
public void testUnregisterExistingKeys() throws Exception {
startUp(TransportProtocol.TCP);
// -- registerActionSerializer
- final ExperimenterActionSerializerKey key1 =
- new ExperimenterActionSerializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42), TestSubType.VALUE);
+ final var key1 = new ExperimenterActionSerializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ TestSubType.VALUE);
provider.registerActionSerializer(key1, serializer);
assertTrue("Wrong -- unregister ActionSerializer", provider.unregisterSerializer(key1));
assertFalse("Wrong -- unregister ActionSerializer by not existing key",
provider.unregisterSerializer(key1));
// -- registerActionDeserializer
- final ExperimenterActionDeserializerKey key2
- = new ExperimenterActionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
+ final var key2 = new ExperimenterActionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
provider.registerActionDeserializer(key2, deserializer);
assertTrue("Wrong -- unregister ActionDeserializer", provider.unregisterDeserializer(key2));
assertFalse("Wrong -- unregister ActionDeserializer by not existing key",
assertFalse("Wrong -- unregister InstructionSerializer by not existing key",
provider.unregisterSerializer(key3));
// -- registerInstructionDeserializer
- final ExperimenterInstructionDeserializerKey key4 =
- new ExperimenterInstructionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
+ final var key4 = new ExperimenterInstructionDeserializerKey(EncodeConstants.OF_VERSION_1_0, 42L);
provider.registerInstructionDeserializer(key4, deserializer);
assertTrue("Wrong -- unregister InstructionDeserializer", provider.unregisterDeserializer(key4));
assertFalse("Wrong -- unregister InstructionDeserializer by not existing key",
provider.unregisterDeserializer(key4));
// -- registerMatchEntryDeserializer
- final MatchEntryDeserializerKey key5 =
- new MatchEntryDeserializerKey(EncodeConstants.OF_VERSION_1_0, 0x8000, 42);
+ final var key5 = new MatchEntryDeserializerKey(EncodeConstants.OF_VERSION_1_0, 0x8000, 42);
provider.registerMatchEntryDeserializer(key5, deserializer);
assertTrue("Wrong -- unregister MatchEntryDeserializer", provider.unregisterDeserializer(key5));
assertFalse("Wrong -- unregister MatchEntryDeserializer by not existing key",
provider.unregisterDeserializer(key5));
// -- registerErrorDeserializer
- final ExperimenterIdDeserializerKey key6 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
- Uint32.valueOf(42), ErrorMessage.class);
+ final var key6 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ ErrorMessage.class);
provider.registerErrorDeserializer(key6, deserializerError);
assertTrue("Wrong -- unregister ErrorDeserializer", provider.unregisterDeserializer(key6));
assertFalse("Wrong -- unregister ErrorDeserializer by not existing key",
provider.unregisterDeserializer(key6));
// -- registerExperimenterMessageDeserializer
- final ExperimenterIdDeserializerKey key7 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
- Uint32.valueOf(42), ExperimenterMessage.class);
+ final var key7 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ ExperimenterMessage.class);
provider.registerExperimenterMessageDeserializer(key7, deserializerExpMsg);
assertTrue("Wrong -- unregister ExperimenterMessageDeserializer", provider.unregisterDeserializer(key7));
assertFalse("Wrong -- unregister ExperimenterMessageDeserializer by not existing key",
provider.unregisterDeserializer(key7));
// -- registerMultipartReplyMessageDeserializer
- final ExperimenterIdDeserializerKey key8 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
+ final var key8 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
Uint32.valueOf(42), MultipartReplyMessage.class);
provider.registerMultipartReplyMessageDeserializer(key8, deserializerMultipartRplMsg);
assertTrue("Wrong -- unregister MultipartReplyMessageDeserializer",
assertFalse("Wrong -- unregister MultipartReplyMessageDeserializer by not existing key",
provider.unregisterDeserializer(key8));
// -- registerMultipartReplyTFDeserializer
- final ExperimenterIdDeserializerKey key9 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
- Uint32.valueOf(42), MultipartReplyMessage.class);
+ final var key9 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ MultipartReplyMessage.class);
provider.registerMultipartReplyTFDeserializer(key9, deserializer);
assertTrue("Wrong -- unregister MultipartReplyTFDeserializer", provider.unregisterDeserializer(key9));
assertFalse("Wrong -- unregister MultipartReplyTFDeserializer by non existing key",
provider.unregisterDeserializer(key9));
// -- registerQueuePropertyDeserializer
- final ExperimenterIdDeserializerKey key10 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
- Uint32.valueOf(42), QueueProperty.class);
+ final var key10 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ QueueProperty.class);
provider.registerQueuePropertyDeserializer(key10, deserializerQueueProperty);
assertTrue("Wrong -- unregister QueuePropertyDeserializer", provider.unregisterDeserializer(key10));
assertFalse("Wrong -- unregister QueuePropertyDeserializer by not existing key",
provider.unregisterDeserializer(key10));
// -- registerMeterBandDeserializer
- final ExperimenterIdDeserializerKey key11 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0,
- Uint32.valueOf(42), MeterBandExperimenterCase.class);
+ final var key11 = new ExperimenterIdDeserializerKey(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ MeterBandExperimenterCase.class);
provider.registerMeterBandDeserializer(key11, deserializerMeterBandExpCase);
assertTrue("Wrong -- unregister MeterBandDeserializer", provider.unregisterDeserializer(key11));
assertFalse("Wrong -- unregister MeterBandDeserializer by not existing key",
provider.unregisterDeserializer(key11));
// -- registerExperimenterMessageSerializer
- ExperimenterIdSerializerKey<ExperimenterDataOfChoice> key12 =
- new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
- ExperimenterDataOfChoice.class);
+ final var key12 = new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ ExperimenterDataOfChoice.class);
provider.registerExperimenterMessageSerializer(key12, serializerExperimenterInput);
assertTrue("Wrong -- unregister ExperimenterMessageSerializer", provider.unregisterSerializer(key12));
assertFalse("Wrong -- unregister ExperimenterMessageSerializer by not existing key",
provider.unregisterSerializer(key12));
//registerMultipartRequestSerializer
- ExperimenterIdSerializerKey<ExperimenterDataOfChoice> key13 =
- new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
- ExperimenterDataOfChoice.class);
+ final var key13 = new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ ExperimenterDataOfChoice.class);
provider.registerMultipartRequestSerializer(key13, serializerMultipartRequestExpCase);
assertTrue("Wrong -- unregister MultipartRequestSerializer", provider.unregisterSerializer(key13));
assertFalse("Wrong -- unregister MultipartRequestSerializer by not existing key",
provider.unregisterSerializer(key13));
// -- registerMultipartRequestTFSerializer
- final ExperimenterIdSerializerKey<TableFeatureProperties> key14 =
- new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
- TableFeatureProperties.class);
+ final var key14 = new ExperimenterIdSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
+ TableFeatureProperties.class);
provider.registerMultipartRequestTFSerializer(key14, serializer);
assertTrue("Wrong -- unregister MultipartRequestTFSerializer", provider.unregisterSerializer(key14));
assertFalse("Wrong -- unregister MultipartRequestTFSerializer by not existing key",
provider.unregisterSerializer(key14));
// -- registerMeterBandSerializer
- final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key15 =
- new ExperimenterIdMeterSubTypeSerializerKey<>(EncodeConstants.OF_VERSION_1_0, Uint32.valueOf(42),
- MeterBandExperimenterCase.class,null);
+ final var key15 = new ExperimenterIdMeterSubTypeSerializerKey<>(EncodeConstants.OF_VERSION_1_0,
+ Uint32.valueOf(42), MeterBandExperimenterCase.class,null);
provider.registerMeterBandSerializer(key15, serializerMeterBandExpCase);
assertTrue("Wrong -- unregister MeterBandSerializer", provider.unregisterSerializer(key15));
assertFalse("Wrong -- unregister MeterBandSerializer by not existing key",
provider.unregisterSerializer(key15));
// -- registerMatchEntrySerializer
- final MatchEntrySerializerKey<OpenflowBasicClass, InPort> key16 =
- new MatchEntrySerializerKey<>(EncodeConstants.OF_VERSION_1_3, OpenflowBasicClass.VALUE, InPort.VALUE);
+ final var key16 = new MatchEntrySerializerKey<>(EncodeConstants.OF_VERSION_1_3, OpenflowBasicClass.VALUE,
+ InPort.VALUE);
provider.registerMatchEntrySerializer(key16, serializer);
assertTrue("Wrong -- unregister MatchEntrySerializer", provider.unregisterSerializer(key16));
assertFalse("Wrong -- unregister MatchEntrySerializer by not existing key",
provider.unregisterSerializer(key15));
// -- registerSerializer
- final MessageTypeKey key17 = new MessageTypeKey<>(EncodeConstants.OF_VERSION_1_3, TestSubType.class);
+ final var key17 = new MessageTypeKey<>(EncodeConstants.OF_VERSION_1_3, TestSubType.class);
provider.registerSerializer(key17, serializer);
// -- registerDeserializer
- final MessageCodeKey key18 = new MessageCodeKey(EncodeConstants.OF_VERSION_1_3, 42, TestSubType.class);
+ final var key18 = new MessageCodeKey(EncodeConstants.OF_VERSION_1_3, 42, TestSubType.class);
provider.registerDeserializer(key18, deserializer);
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.net.InetAddress;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.openflowjava.protocol.impl.core.UdpChannelInitializer;
-import org.opendaylight.openflowjava.protocol.impl.core.UdpHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit tests for UdpHandler.
- *
- * @author madamjak
- */
-@RunWith(MockitoJUnitRunner.class)
-public class UdpHandlerTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(UdpHandlerTest.class);
-
- @Mock
- private UdpChannelInitializer udpChannelInitializerMock;
- private UdpHandler udpHandler;
-
- /**
- * Test to create UdpHandler with empty address and zero port.
- */
- @Test
- public void testWithEmptyAddress() throws Exception {
- udpHandler = new UdpHandler(null, 0, () -> { });
- udpHandler.setChannelInitializer(udpChannelInitializerMock);
- assertTrue("Wrong - start server", startupServer(false));
- udpHandler.getIsOnlineFuture().get(1500, TimeUnit.MILLISECONDS);
- assertFalse("Wrong - port has been set to zero", udpHandler.getPort() == 0);
- shutdownServer();
- }
-
- /**
- * Test to create UdpHandler with empty address and zero port on Epoll native transport.
- */
- @Test
- public void testWithEmptyAddressOnEpoll() throws Exception {
- udpHandler = new UdpHandler(null, 0, () -> { });
- udpHandler.setChannelInitializer(udpChannelInitializerMock);
- assertTrue("Wrong - start server", startupServer(true));
- udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS);
- assertFalse("Wrong - port has been set to zero", udpHandler.getPort() == 0);
- shutdownServer();
- }
-
- /**
- * Test to create UdpHandler with fill address and given port.
- */
- @Test
- public void testWithAddressAndPort() throws Exception {
- int port = 9874;
- udpHandler = new UdpHandler(InetAddress.getLocalHost(), port, () -> { });
- udpHandler.setChannelInitializer(udpChannelInitializerMock);
- assertTrue("Wrong - start server", startupServer(false));
- udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS);
- assertEquals("Wrong - bad port number has been set", port, udpHandler.getPort());
- shutdownServer();
- }
-
- /**
- * Test to create UdpHandler with fill address and given port on Epoll native transport.
- */
- @Test
- public void testWithAddressAndPortOnEpoll() throws Exception {
- int port = 9874;
- udpHandler = new UdpHandler(InetAddress.getLocalHost(), port, () -> { });
- udpHandler.setChannelInitializer(udpChannelInitializerMock);
- assertTrue("Wrong - start server", startupServer(true));
- udpHandler.getIsOnlineFuture().get(1500,TimeUnit.MILLISECONDS);
- assertEquals("Wrong - bad port number has been set", port, udpHandler.getPort());
- shutdownServer();
- }
-
- private Boolean startupServer(final boolean isEpollEnabled)
- throws InterruptedException, ExecutionException {
- final var online = udpHandler.getIsOnlineFuture();
- /**
- * Test EPoll based native transport if isEpollEnabled is true.
- * Else use Nio based transport.
- */
- udpHandler.initiateEventLoopGroups(null, isEpollEnabled);
- new Thread(udpHandler).start();
-
- try {
- online.get(10, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- LOG.warn("Timeout while waiting for UDP handler to start", e);
- }
-
- return online.isDone();
- }
-
- private void shutdownServer() throws InterruptedException, ExecutionException, TimeoutException {
- final var shutdownRet = udpHandler.shutdown() ;
- assertNull("Wrong - shutdown failed", shutdownRet.get(10, TimeUnit.SECONDS));
- }
-}
import org.opendaylight.openflowjava.protocol.impl.clients.UdpSimpleClient;
import org.opendaylight.openflowjava.protocol.impl.clients.WaitForMessageEvent;
import org.opendaylight.openflowjava.protocol.impl.core.SwitchConnectionProviderImpl;
-import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler;
-import org.opendaylight.openflowjava.protocol.impl.core.UdpHandler;
import org.opendaylight.openflowjava.protocol.impl.core.connection.ConnectionConfigurationImpl;
import org.opendaylight.openflowjava.util.ByteBufUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.KeystoreType;
switchConnectionProvider = new SwitchConnectionProviderImpl(diagStatusService, connConfig);
switchConnectionProvider.startup(mockPlugin).get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
- if (protocol.equals(TransportProtocol.TCP) || protocol.equals(TransportProtocol.TLS)) {
- final TcpHandler tcpHandler = (TcpHandler) switchConnectionProvider.getServerFacade();
- port = tcpHandler.getPort();
- } else {
- final UdpHandler udpHandler = (UdpHandler) switchConnectionProvider.getServerFacade();
- port = udpHandler.getPort();
- }
+ port = switchConnectionProvider.getServerFacade().localAddress().getPort();
}
@After
- public void tearDown() {
+ public void tearDown() throws Exception {
switchConnectionProvider.close();
LOGGER.debug("\n ending test -------------------------------");
}