Merge "Report (TCP) port number for switches"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
index c919099948de5c035f3686c5da1c5a53d6a13e63..421c60ae8c31d54970c644b237dc34f887d58cc4 100644 (file)
@@ -11,19 +11,24 @@ package org.opendaylight.openflowplugin.impl;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import java.lang.management.ManagementFactory;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
 import javax.management.MBeanRegistrationException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -31,7 +36,6 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
@@ -71,20 +75,28 @@ import org.slf4j.LoggerFactory;
 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginConfigurationService, OpenFlowPluginExtensionRegistratorProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
-    private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+
     private static final int TICKS_PER_WHEEL = 500; // 0.5 sec.
     private static final long TICK_DURATION = 10;
     private static final String POOL_NAME = "ofppool";
 
+    private static final MessageIntelligenceAgency MESSAGE_INTELLIGENCE_AGENCY = new MessageIntelligenceAgencyImpl();
+    private static final String MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME = String
+            .format("%s:type=%s",
+                    MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
+                    MessageIntelligenceAgencyMXBean.class.getSimpleName());
+
     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
-    private final NotificationService notificationProviderService;
     private final NotificationPublishService notificationPublishService;
     private final ExtensionConverterManager extensionConverterManager;
     private final DataBroker dataBroker;
     private final Collection<SwitchConnectionProvider> switchConnectionProviders;
     private final DeviceInitializerProvider deviceInitializerProvider;
     private final ConvertorManager convertorManager;
-    private final ContextChainHolder contextChainHolder;
+    private final RpcProviderRegistry rpcProviderRegistry;
+    private final ClusterSingletonServiceProvider singletonServicesProvider;
+    private final EntityOwnershipService entityOwnershipService;
+    private ContextChainHolder contextChainHolder;
     private int rpcRequestsQuota;
     private long globalNotificationQuota;
     private long barrierInterval;
@@ -92,7 +104,6 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
     private long echoReplyTimeout;
     private DeviceManager deviceManager;
     private RpcManager rpcManager;
-    private RpcProviderRegistry rpcProviderRegistry;
     private StatisticsManager statisticsManager;
     private ConnectionManager connectionManager;
     private boolean switchFeaturesMandatory;
@@ -104,32 +115,28 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
     private long maximumTimerDelay;
     private boolean useSingleLayerSerialization;
     private ThreadPoolExecutor threadPool;
-    private ClusterSingletonServiceProvider singletonServicesProvider;
     private int threadPoolMinThreads;
     private int threadPoolMaxThreads;
     private long threadPoolTimeout;
     private boolean initialized = false;
 
     public static MessageIntelligenceAgency getMessageIntelligenceAgency() {
-        return messageIntelligenceAgency;
+        return MESSAGE_INTELLIGENCE_AGENCY;
     }
 
-    public OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
-                                      final DataBroker dataBroker,
-                                      final RpcProviderRegistry rpcProviderRegistry,
-                                      final NotificationService notificationProviderService,
-                                      final NotificationPublishService notificationPublishService,
-                                      final ClusterSingletonServiceProvider singletonServiceProvider,
-                                      final EntityOwnershipService entityOwnershipService) {
+    OpenFlowPluginProviderImpl(final List<SwitchConnectionProvider> switchConnectionProviders,
+                               final DataBroker dataBroker,
+                               final RpcProviderRegistry rpcProviderRegistry,
+                               final NotificationPublishService notificationPublishService,
+                               final ClusterSingletonServiceProvider singletonServiceProvider,
+                               final EntityOwnershipService entityOwnershipService) {
         this.switchConnectionProviders = switchConnectionProviders;
         this.dataBroker = dataBroker;
         this.rpcProviderRegistry = rpcProviderRegistry;
-        this.notificationProviderService = notificationProviderService;
         this.notificationPublishService = notificationPublishService;
         this.singletonServicesProvider = singletonServiceProvider;
+        this.entityOwnershipService = entityOwnershipService;
         convertorManager = ConvertorManagerFactory.createDefaultManager();
-        contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
-        contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
         extensionConverterManager = new ExtensionConverterManagerImpl();
         deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
     }
@@ -137,7 +144,7 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
     private void startSwitchConnections() {
         Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
-            // Inject OpenflowPlugin custom serializers and deserializers into OpenflowJava
+            // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
             if (useSingleLayerSerialization) {
                 SerializerInjector.injectSerializers(switchConnectionProvider);
                 DeserializerInjector.injectDeserializers(switchConnectionProvider);
@@ -155,19 +162,40 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
             }
 
             @Override
-            public void onFailure(@Nonnull final Throwable t) {
-                LOG.warn("Some switchConnectionProviders failed to start.", t);
+            public void onFailure(@Nonnull final Throwable throwable) {
+                LOG.warn("Some switchConnectionProviders failed to start.", throwable);
+            }
+        });
+    }
+
+    private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
+        final ListenableFuture<List<Boolean>> listListenableFuture = Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
+            // Revert deserializers to their original state
+            if (useSingleLayerSerialization) {
+                DeserializerInjector.revertDeserializers(switchConnectionProvider);
+            }
+
+            // Shutdown switch connection provider
+            return switchConnectionProvider.shutdown();
+        }).collect(Collectors.toSet()));
+
+        Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
+            @Override
+            public void onSuccess(final List<Boolean> result) {
+                LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
             }
         });
+
+        return listListenableFuture;
     }
 
     @Override
     public void initialize() {
-        Preconditions.checkNotNull(dataBroker, "missing data broker");
-        Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
-        Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
-        Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
-
         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
         // TODO: rewrite later!
         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
@@ -181,10 +209,14 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
                 Preconditions.checkNotNull(threadPoolTimeout),
                 TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
 
+
+        contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer, threadPool);
+        contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
+
         connectionManager = new ConnectionManagerImpl(threadPool);
         connectionManager.setEchoReplyTimeout(echoReplyTimeout);
 
-        registerMXBean(messageIntelligenceAgency);
+        registerMXBean(MESSAGE_INTELLIGENCE_AGENCY, MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
 
         contextChainHolder.addSingletonServicesProvider(singletonServicesProvider);
 
@@ -241,8 +273,6 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
             if (Objects.nonNull(propertyType)) {
                 updateProperty(propertyType, value);
-            } else if (!key.equals("service.pid") && !key.equals("felix.fileinstall.filename")) {
-                LOG.warn("Unsupported configuration property '{}={}'", key, value);
             }
         });
     }
@@ -472,25 +502,65 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         initialized = false;
-        //TODO: consider wrapping each manager into try-catch
-        deviceManager.close();
-        rpcManager.close();
-        statisticsManager.close();
 
-        // Manually shutdown all remaining running threads in pool
-        threadPool.shutdown();
+        try {
+            shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
+        }
+
+        gracefulShutdown(contextChainHolder);
+        gracefulShutdown(deviceManager);
+        gracefulShutdown(rpcManager);
+        gracefulShutdown(statisticsManager);
+        gracefulShutdown(threadPool);
+        gracefulShutdown(hashedWheelTimer);
+        unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
     }
 
-    private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
+    private static void gracefulShutdown(final AutoCloseable closeable) {
+        if (Objects.isNull(closeable)) {
+            return;
+        }
+
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            LOG.warn("Failed to shutdown {} gracefully.", closeable);
+        }
+    }
+
+    private static void gracefulShutdown(final Timer timer) {
+        if (Objects.isNull(timer)) {
+            return;
+        }
+
+        try {
+            timer.stop();
+        } catch (Exception e) {
+            LOG.warn("Failed to shutdown {} gracefully.", timer);
+        }
+    }
+
+    private static void gracefulShutdown(final ThreadPoolExecutor threadPoolExecutor) {
+        if (Objects.isNull(threadPoolExecutor)) {
+            return;
+        }
+
+        try {
+            threadPoolExecutor.shutdownNow();
+        } catch (Exception e) {
+            LOG.warn("Failed to shutdown {} gracefully.", threadPoolExecutor);
+        }
+    }
+
+    private static void registerMXBean(final Object bean, final String beanName) {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
         try {
-            final String pathToMxBean = String.format("%s:type=%s",
-                    MessageIntelligenceAgencyMXBean.class.getPackage().getName(),
-                    MessageIntelligenceAgencyMXBean.class.getSimpleName());
-            final ObjectName name = new ObjectName(pathToMxBean);
-            mbs.registerMBean(messageIntelligenceAgency, name);
+            mbs.registerMBean(bean, new ObjectName(beanName));
         } catch (MalformedObjectNameException
                 | NotCompliantMBeanException
                 | MBeanRegistrationException
@@ -498,4 +568,16 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
             LOG.warn("Error registering MBean {}", e);
         }
     }
+
+    private static void unregisterMXBean(final String beanName) {
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+        try {
+            mbs.unregisterMBean(new ObjectName(beanName));
+        } catch (InstanceNotFoundException
+                | MBeanRegistrationException
+                | MalformedObjectNameException e) {
+            LOG.warn("Error unregistering MBean {}", e);
+        }
+    }
 }