report diagstatus from UdpHandler/TcpHandler on Netty thread terminate
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / SwitchConnectionProviderImpl.java
index c7bc5cc18a7563abb90fa241dd7c3d2c37cc8757..5d135be153f61a51bddcfa44a9ed505820a5dd9f 100755 (executable)
@@ -9,10 +9,19 @@
 
 package org.opendaylight.openflowjava.protocol.impl.core;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
+import org.opendaylight.infrautils.diagstatus.ServiceState;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
@@ -60,8 +69,10 @@ import org.slf4j.LoggerFactory;
  */
 public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(SwitchConnectionProviderImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
+    private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
+    private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
+
     private SwitchConnectionHandler switchConnectionHandler;
     private ServerFacade serverFacade;
     private final ConnectionConfiguration connConfig;
@@ -69,10 +80,24 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     private final SerializerRegistry serializerRegistry;
     private final DeserializerRegistry deserializerRegistry;
     private final DeserializationFactory deserializationFactory;
+    private final ListeningExecutorService listeningExecutorService;
+    private final DiagStatusService diagStatusService;
+    private final String diagStatusIdentifier;
+    private final String threadName;
     private TcpConnectionInitializer connectionInitializer;
 
-    public SwitchConnectionProviderImpl(ConnectionConfiguration connConfig) {
+    public SwitchConnectionProviderImpl(
+            @Nullable ConnectionConfiguration connConfig, DiagStatusService diagStatusService) {
         this.connConfig = connConfig;
+        String connectionSuffix = createConnectionSuffix(connConfig);
+
+        this.diagStatusService = diagStatusService;
+        this.diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
+        diagStatusService.register(diagStatusIdentifier);
+
+        this.threadName = THREAD_NAME_PREFIX + connectionSuffix;
+        this.listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
+
         serializerRegistry = new SerializerRegistryImpl();
         if (connConfig != null) {
             serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
@@ -84,6 +109,17 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         deserializationFactory = new DeserializationFactory(deserializerRegistry);
     }
 
+    // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
+    private static String createConnectionSuffix(@Nullable ConnectionConfiguration config) {
+        if (config != null && config.getAddress() != null) {
+            return "-" + config.getAddress().toString() + "_" + config.getPort();
+        } else if (config != null) {
+            return "_" + config.getPort();
+        } else {
+            return "-null-config";
+        }
+    }
+
     @Override
     public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
         LOG.debug("setSwitchConnectionHandler");
@@ -97,7 +133,9 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
             LOG.warn("Can not shutdown - not configured or started");
             throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
         }
-        return serverFacade.shutdown();
+        ListenableFuture<Boolean> serverFacadeShutdownFuture = serverFacade.shutdown();
+        Executors.shutdownAndAwaitTermination(listeningExecutorService);
+        return serverFacadeShutdownFuture;
     }
 
     @Override
@@ -110,7 +148,19 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
             if (switchConnectionHandler == null) {
                 throw new IllegalStateException("SwitchConnectionHandler is not set");
             }
-            new Thread(serverFacade).start();
+            Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
+
+                @Override
+                public void onFailure(Throwable throwable) {
+                    diagStatusService.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
+                }
+
+                @Override
+                public void onSuccess(@Nullable Object nullResult) {
+                    diagStatusService.report(new ServiceDescriptor(
+                            diagStatusIdentifier, ServiceState.ERROR, threadName + " terminated"));
+                }
+            } , MoreExecutors.directExecutor());
             result = serverFacade.getIsOnlineFuture();
         } catch (RuntimeException e) {
             final SettableFuture<Boolean> exResult = SettableFuture.create();
@@ -122,7 +172,7 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
 
     private ServerFacade createAndConfigureServer() {
         LOG.debug("Configuring ..");
-        ServerFacade server = null;
+        ServerFacade server;
         final ChannelInitializerFactory factory = new ChannelInitializerFactory();
         factory.setSwitchConnectionHandler(switchConnectionHandler);
         factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
@@ -130,6 +180,7 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         factory.setSerializationFactory(serializationFactory);
         factory.setDeserializationFactory(deserializationFactory);
         factory.setUseBarrier(connConfig.useBarrier());
+        factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
         final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
 
         // Check if Epoll native transport is available.
@@ -137,7 +188,8 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         boolean isEpollEnabled = Epoll.isAvailable();
 
         if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
-            server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
+            server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
+                            .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
             final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
             ((TcpHandler) server).setChannelInitializer(channelInitializer);
             ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
@@ -147,7 +199,8 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
             connectionInitializer.setChannelInitializer(channelInitializer);
             connectionInitializer.run();
         } else if (TransportProtocol.UDP.equals(transportProtocol)) {
-            server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
+            server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
+                    .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
             ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
             ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
         } else {