<type>xml</type>
<classifier>features</classifier>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.infrautils</groupId>
+ <artifactId>odl-infrautils-diagstatus</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <type>xml</type>
+ <classifier>features</classifier>
+ </dependency>
<!-- bundle dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.infrautils</groupId>
+ <artifactId>diagstatus-api</artifactId>
+ <version>${infrautils.version}</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
+import javax.inject.Inject;
import javax.inject.Singleton;
+import org.apache.aries.blueprint.annotation.service.Reference;
import org.apache.aries.blueprint.annotation.service.Service;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
@Service(classes = SwitchConnectionProviderFactory.class)
public class SwitchConnectionProviderFactoryImpl implements SwitchConnectionProviderFactory {
+ private final DiagStatusService diagStatusService;
+
+ @Inject
+ public SwitchConnectionProviderFactoryImpl(@Reference DiagStatusService diagStatusService) {
+ this.diagStatusService = diagStatusService;
+ }
+
@Override
public SwitchConnectionProvider newInstance(SwitchConnectionConfig config) {
- return new SwitchConnectionProviderImpl(new ConnectionConfigurationImpl(config));
+ return new SwitchConnectionProviderImpl(new ConnectionConfigurationImpl(config), diagStatusService);
}
private static InetAddress getInetAddress(final IpAddress address) throws UnknownHostException {
package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
+import org.opendaylight.infrautils.diagstatus.ServiceState;
import org.opendaylight.infrautils.utils.concurrent.Executors;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
+ private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
+ private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
private SwitchConnectionHandler switchConnectionHandler;
private ServerFacade serverFacade;
private final DeserializerRegistry deserializerRegistry;
private final DeserializationFactory deserializationFactory;
private final ListeningExecutorService listeningExecutorService;
+ private final DiagStatusService diagStatusService;
+ private final String diagStatusIdentifier;
+ private final String threadName;
private TcpConnectionInitializer connectionInitializer;
- public SwitchConnectionProviderImpl(ConnectionConfiguration connConfig) {
- this.listeningExecutorService = Executors
- .newListeningSingleThreadExecutor("OFP-SwitchConnectionProvider-Udp/TcpHandler", LOG);
+ public SwitchConnectionProviderImpl(
+ @Nullable ConnectionConfiguration connConfig, DiagStatusService diagStatusService) {
this.connConfig = connConfig;
+ String connectionSuffix = createConnectionSuffix(connConfig);
+
+ this.diagStatusService = diagStatusService;
+ this.diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
+ diagStatusService.register(diagStatusIdentifier);
+
+ this.threadName = THREAD_NAME_PREFIX + connectionSuffix;
+ this.listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
+
serializerRegistry = new SerializerRegistryImpl();
if (connConfig != null) {
serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
deserializationFactory = new DeserializationFactory(deserializerRegistry);
}
+ // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
+ private static String createConnectionSuffix(@Nullable ConnectionConfiguration config) {
+ if (config != null && config.getAddress() != null) {
+ return "-" + config.getAddress().toString() + "_" + config.getPort();
+ } else if (config != null) {
+ return "_" + config.getPort();
+ } else {
+ return "-null-config";
+ }
+ }
+
@Override
public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
LOG.debug("setSwitchConnectionHandler");
if (switchConnectionHandler == null) {
throw new IllegalStateException("SwitchConnectionHandler is not set");
}
- listeningExecutorService.submit(serverFacade);
+ Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ diagStatusService.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
+ }
+
+ @Override
+ public void onSuccess(@Nullable Object nullResult) {
+ diagStatusService.report(new ServiceDescriptor(
+ diagStatusIdentifier, ServiceState.ERROR, threadName + " terminated"));
+ }
+ } , MoreExecutors.directExecutor());
result = serverFacade.getIsOnlineFuture();
} catch (RuntimeException e) {
final SettableFuture<Boolean> exResult = SettableFuture.create();
private ServerFacade createAndConfigureServer() {
LOG.debug("Configuring ..");
- ServerFacade server = null;
+ ServerFacade server;
final ChannelInitializerFactory factory = new ChannelInitializerFactory();
factory.setSwitchConnectionHandler(switchConnectionHandler);
factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
boolean isEpollEnabled = Epoll.isAvailable();
if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
- server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
+ server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
+ .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
((TcpHandler) server).setChannelInitializer(channelInitializer);
((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
connectionInitializer.setChannelInitializer(channelInitializer);
connectionInitializer.run();
} else if (TransportProtocol.UDP.equals(transportProtocol)) {
- server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
+ server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
+ .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
} else {
private int port;
private String address;
private final InetAddress startupAddress;
+ private final Runnable readyRunnable;
private EventLoopGroup workerGroup;
private EventLoopGroup bossGroup;
private final SettableFuture<Boolean> isOnlineFuture;
*
* @param port listening port of TCPHandler server
*/
- public TcpHandler(final int port) {
- this(null, port);
+ public TcpHandler(final int port, Runnable readyRunnable) {
+ this(null, port, readyRunnable);
}
/**
* @param address listening address of TCPHandler server
* @param port listening port of TCPHandler server
*/
- public TcpHandler(final InetAddress address, final int port) {
+ public TcpHandler(final InetAddress address, final int port, Runnable readyRunnable) {
this.port = port;
this.startupAddress = address;
isOnlineFuture = SettableFuture.create();
+ this.readyRunnable = readyRunnable;
}
/**
LOG.debug("address from tcphandler: {}", address);
isOnlineFuture.set(true);
LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
+
+ readyRunnable.run();
+
+ // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for port {} shutdown", port, e);
*/
public final class UdpHandler implements ServerFacade {
- private static final Logger LOG = LoggerFactory
- .getLogger(UdpHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(UdpHandler.class);
+
private int port;
private EventLoopGroup group;
private final InetAddress startupAddress;
+ private final Runnable readyRunnable;
private final SettableFuture<Boolean> isOnlineFuture;
private UdpChannelInitializer channelInitializer;
private ThreadConfiguration threadConfig;
*
* @param port listening port of UdpHandler server
*/
- public UdpHandler(final int port) {
- this(null, port);
+ public UdpHandler(final int port, Runnable readyRunnable) {
+ this(null, port, readyRunnable);
}
/**
* @param address listening address of UdpHandler server
* @param port listening port of UdpHandler server
*/
- public UdpHandler(final InetAddress address, final int port) {
+ public UdpHandler(final InetAddress address, final int port, Runnable readyRunnable) {
this.port = port;
this.startupAddress = address;
isOnlineFuture = SettableFuture.create();
+ this.readyRunnable = readyRunnable;
}
@Override
LOG.debug("Address from udpHandler: {}", address);
isOnlineFuture.set(true);
LOG.info("Switch listener started and ready to accept incoming udp connections on port: {}", port);
+ readyRunnable.run();
+ // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for port {} shutdown", port, e);
*/
@Test
public void testRunWithNullAddress() throws IOException, InterruptedException, ExecutionException {
-
- tcpHandler = new TcpHandler(null, 0);
+ tcpHandler = new TcpHandler(null, 0, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
assertEquals("failed to start server", true, startupServer(false)) ;
*/
@Test
public void testRunWithNullAddressOnEpoll() throws IOException, InterruptedException, ExecutionException {
-
- tcpHandler = new TcpHandler(null, 0);
+ tcpHandler = new TcpHandler(null, 0, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
//Use Epoll native transport
*/
@Test
public void testRunWithAddress() throws IOException, InterruptedException, ExecutionException {
-
- tcpHandler = new TcpHandler(serverAddress, 0);
+ tcpHandler = new TcpHandler(serverAddress, 0, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
assertEquals("failed to start server", true, startupServer(false)) ;
*/
@Test
public void testRunWithAddressOnEpoll() throws IOException, InterruptedException, ExecutionException {
-
- tcpHandler = new TcpHandler(serverAddress, 0);
+ tcpHandler = new TcpHandler(serverAddress, 0, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
//Use Epoll native transport
@Test
public void testRunWithEncryption() throws InterruptedException, IOException, ExecutionException {
int serverPort = 28001;
- tcpHandler = new TcpHandler(serverAddress, serverPort);
+ tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
assertEquals("failed to start server", true, startupServer(false));
@Test
public void testRunWithEncryptionOnEpoll() throws InterruptedException, IOException, ExecutionException {
int serverPort = 28001;
- tcpHandler = new TcpHandler(serverAddress, serverPort);
+ tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
//Use Epoll native transport
try {
firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
- tcpHandler = new TcpHandler(serverAddress, serverPort);
+ tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
tcpHandler.initiateEventLoopGroups(null, false);
tcpHandler.run();
try {
firstBinder.bind(new InetSocketAddress(serverAddress, serverPort));
- tcpHandler = new TcpHandler(serverAddress, serverPort);
+ tcpHandler = new TcpHandler(serverAddress, serverPort, () -> { });
tcpHandler.setChannelInitializer(mockChannelInitializer);
//Use Epoll native transport
tcpHandler.initiateEventLoopGroups(null, true);
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfigurationImpl;
* @author michal.polkorab
*/
public class SwitchConnectionProviderImpl02Test {
+ @Mock DiagStatusService diagStatusService;
@Mock SwitchConnectionHandler handler;
@Mock OFGeneralSerializer serializer;
@Mock OFGeneralDeserializer deserializer;
if (protocol != null) {
createConfig(protocol);
}
- provider = new SwitchConnectionProviderImpl(config);
+ provider = new SwitchConnectionProviderImpl(config, diagStatusService);
}
private void createConfig(final TransportProtocol protocol) throws UnknownHostException {
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfigurationImpl;
public class SwitchConnectionProviderImplTest {
@Mock SwitchConnectionHandler handler;
+ @Mock DiagStatusService diagStatusService;
private static final int SWITCH_IDLE_TIMEOUT = 2000;
private static final int WAIT_TIMEOUT = 2000;
if (protocol != null) {
createConfig(protocol);
}
- provider = new SwitchConnectionProviderImpl(config);
+ provider = new SwitchConnectionProviderImpl(config, diagStatusService);
}
private void createConfig(final TransportProtocol protocol) throws UnknownHostException {
* Tests provider startup - without configuration and {@link SwitchConnectionHandler}.
*/
@Test
- public void testStartup1() {
- provider = new SwitchConnectionProviderImpl(config);
+ public void testStartup1() throws UnknownHostException {
+ startUp(null);
final ListenableFuture<Boolean> future = provider.startup();
try {
future.get(WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
*/
@Test
public void testWithEmptyAddress() throws Exception {
- udpHandler = new UdpHandler(null, 0);
+ udpHandler = new UdpHandler(null, 0, () -> { });
udpHandler.setChannelInitializer(udpChannelInitializerMock);
Assert.assertTrue("Wrong - start server", startupServer(false));
try {
*/
@Test
public void testWithEmptyAddressOnEpoll() throws Exception {
- udpHandler = new UdpHandler(null, 0);
+ udpHandler = new UdpHandler(null, 0, () -> { });
udpHandler.setChannelInitializer(udpChannelInitializerMock);
Assert.assertTrue("Wrong - start server", startupServer(true));
try {
@Test
public void testWithAddressAndPort() throws Exception {
int port = 9874;
- udpHandler = new UdpHandler(InetAddress.getLocalHost(), port);
+ udpHandler = new UdpHandler(InetAddress.getLocalHost(), port, () -> { });
udpHandler.setChannelInitializer(udpChannelInitializerMock);
Assert.assertTrue("Wrong - start server", startupServer(false));
try {
@Test
public void testWithAddressAndPortOnEpoll() throws Exception {
int port = 9874;
- udpHandler = new UdpHandler(InetAddress.getLocalHost(), port);
+ udpHandler = new UdpHandler(InetAddress.getLocalHost(), port, () -> { });
udpHandler.setChannelInitializer(udpChannelInitializerMock);
Assert.assertTrue("Wrong - start server", startupServer(true));
try {
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfigurationImpl;
import org.opendaylight.openflowjava.protocol.impl.clients.ClientEvent;
connConfig.setTransferProtocol(protocol);
mockPlugin = new MockPlugin();
- switchConnectionProvider = new SwitchConnectionProviderImpl(connConfig);
+ switchConnectionProvider = new SwitchConnectionProviderImpl(connConfig, Mockito.mock(DiagStatusService.class));
switchConnectionProvider.setSwitchConnectionHandler(mockPlugin);
switchConnectionProvider.startup().get(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
if (protocol.equals(TransportProtocol.TCP) || protocol.equals(TransportProtocol.TLS)) {