Bug 5596 Added cluster provider
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
index 4908331a52c274ff284839b3dd4ae0edc1019a13..ba8f806c083633de38305fc5cef8122a4c72df37 100644 (file)
@@ -17,6 +17,11 @@ import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.MBeanRegistrationException;
 import javax.management.MBeanServer;
@@ -28,10 +33,13 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
@@ -48,14 +56,14 @@ import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 27.3.2015.
- */
 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginExtensionRegistratorProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
@@ -63,6 +71,10 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
     private final int rpcRequestsQuota;
     private final long globalNotificationQuota;
+    private final ConvertorManager convertorManager;
+    private long barrierInterval;
+    private int barrierCountLimit;
+    private long echoReplyTimeout;
     private DeviceManager deviceManager;
     private RoleManager roleManager;
     private RpcManager rpcManager;
@@ -80,11 +92,33 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
     private boolean switchFeaturesMandatory = false;
     private boolean isStatisticsPollingOff = false;
     private boolean isStatisticsRpcEnabled;
-
-    public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, final Long globalNotificationQuota) {
+    private boolean isNotificationFlowRemovedOff = false;
+    private Map<String,Object>  managedProperties;
+
+    private final LifecycleConductor conductor;
+    private final ThreadPoolExecutor threadPool;
+    private ClusterSingletonServiceProvider singletonServicesProvider;
+
+    public OpenFlowPluginProviderImpl(final long rpcRequestsQuota,
+                                      final long globalNotificationQuota,
+                                      final int threadPoolMinThreads,
+                                      final int threadPoolMaxThreads,
+                                      final long threadPoolTimeout) {
         Preconditions.checkArgument(rpcRequestsQuota > 0 && rpcRequestsQuota <= Integer.MAX_VALUE, "rpcRequestQuota has to be in range <1,%s>", Integer.MAX_VALUE);
         this.rpcRequestsQuota = (int) rpcRequestsQuota;
         this.globalNotificationQuota = Preconditions.checkNotNull(globalNotificationQuota);
+
+        // Creates a thread pool that creates new threads as needed, but will reuse previously
+        // constructed threads when they are available.
+        // Threads that have not been used for x seconds are terminated and removed from the cache.
+        threadPool = new ThreadPoolLoggingExecutor(
+                Preconditions.checkNotNull(threadPoolMinThreads),
+                Preconditions.checkNotNull(threadPoolMaxThreads),
+                Preconditions.checkNotNull(threadPoolTimeout), TimeUnit.SECONDS,
+                new SynchronousQueue<>(), "ofppool");
+
+        convertorManager = ConvertorManagerFactory.createDefaultManager();
+        conductor = new LifecycleConductorImpl(messageIntelligenceAgency, convertorManager);
     }
 
     @Override
@@ -114,7 +148,7 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
             }
 
             @Override
-            public void onFailure(final Throwable t) {
+            public void onFailure(@Nonnull final Throwable t) {
                 LOG.warn("Some switchConnectionProviders failed to start.", t);
             }
         });
@@ -130,6 +164,31 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
         this.entityOwnershipService = entityOwnershipService;
     }
 
+    @Override
+    public void setBarrierCountLimit(final int barrierCountLimit) {
+        this.barrierCountLimit = barrierCountLimit;
+    }
+
+    @Override
+    public void setBarrierInterval(final long barrierTimeoutLimit) {
+        this.barrierInterval = barrierTimeoutLimit;
+    }
+
+    @Override
+    public void setEchoReplyTimeout(final long echoReplyTimeout) {
+        this.echoReplyTimeout = echoReplyTimeout;
+    }
+
+    @Override
+    public void setNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
+        this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
+    }
+
+    public void setClusteringSingletonServicesProvider(ClusterSingletonServiceProvider singletonServicesProvider) {
+        this.singletonServicesProvider = singletonServicesProvider;
+    }
+
+
     @Override
     public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) {
         this.switchFeaturesMandatory = switchFeaturesMandatory;
@@ -156,44 +215,76 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
     @Override
     public void initialize() {
-
         Preconditions.checkNotNull(dataBroker, "missing data broker");
         Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
         Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
+        Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
 
         extensionConverterManager = new ExtensionConverterManagerImpl();
         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
         // TODO: rewrite later!
         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
 
-        connectionManager = new ConnectionManagerImpl();
+        connectionManager = new ConnectionManagerImpl(echoReplyTimeout, threadPool);
 
         registerMXBean(messageIntelligenceAgency);
 
-        deviceManager = new DeviceManagerImpl(dataBroker, messageIntelligenceAgency, switchFeaturesMandatory, globalNotificationQuota);
+        deviceManager = new DeviceManagerImpl(dataBroker,
+                globalNotificationQuota,
+                switchFeaturesMandatory,
+                barrierInterval,
+                barrierCountLimit,
+                conductor,
+                isNotificationFlowRemovedOff,
+                convertorManager,
+                singletonServicesProvider);
+        ((ExtensionConverterProviderKeeper) conductor).setExtensionConverterProvider(extensionConverterManager);
         ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
-        roleManager = new RoleManagerImpl(rpcProviderRegistry, entityOwnershipService, switchFeaturesMandatory);
-        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff);
-        rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota);
 
-        // CM -> DM -> Role -> SM -> RPC -> DM
+        conductor.setSafelyManager(deviceManager);
+        conductor.setNotificationPublishService(notificationPublishService);
+
+        roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, conductor);
+        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, conductor, convertorManager);
+        conductor.setSafelyManager(statisticsManager);
+
+        rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, conductor, extensionConverterManager, convertorManager);
+        conductor.setSafelyManager(rpcManager);
+
+        roleManager.addRoleChangeListener((RoleChangeListener) conductor);
+
+        /* Initialization Phase ordering - OFP Device Context suite */
+        // CM -> DM -> SM -> RPC -> Role -> DM
         connectionManager.setDeviceConnectedHandler(deviceManager);
-        deviceManager.setDeviceInitializationPhaseHandler(roleManager);
-        roleManager.setDeviceInitializationPhaseHandler(statisticsManager);
+        deviceManager.setDeviceInitializationPhaseHandler(statisticsManager);
         statisticsManager.setDeviceInitializationPhaseHandler(rpcManager);
-        rpcManager.setDeviceInitializationPhaseHandler(deviceManager);
-        rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
-        rpcManager.setNotificationPublishService(notificationPublishService);
+        rpcManager.setDeviceInitializationPhaseHandler(roleManager);
+        roleManager.setDeviceInitializationPhaseHandler(deviceManager);
+
+        /* Termination Phase ordering - OFP Device Context suite */
+        deviceManager.setDeviceTerminationPhaseHandler(rpcManager);
+        rpcManager.setDeviceTerminationPhaseHandler(statisticsManager);
+        statisticsManager.setDeviceTerminationPhaseHandler(roleManager);
+        roleManager.setDeviceTerminationPhaseHandler(deviceManager);
 
-        deviceManager.setNotificationService(this.notificationProviderService);
-        deviceManager.setNotificationPublishService(this.notificationPublishService);
+        rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
 
-        TranslatorLibraryUtil.setBasicTranslatorLibrary(deviceManager);
+        TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
         deviceManager.initialize();
 
         startSwitchConnections();
     }
 
+    @Override
+    public void update(Map<String,Object> props) {
+        LOG.debug("Update managed properties = {}", props.toString());
+        this.managedProperties = props;
+
+        if(deviceManager != null && props.containsKey("notification-flow-removed-off")) {
+            deviceManager.setIsNotificationFlowRemovedOff(Boolean.valueOf(props.get("notification-flow-removed-off").toString()));
+        }
+    }
+
     private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try {
@@ -232,9 +323,16 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
     @Override
     public void close() throws Exception {
+        //TODO: consider wrapping each manager into try-catch
         deviceManager.close();
         rpcManager.close();
         statisticsManager.close();
+
+        // TODO: needs to close org.opendaylight.openflowplugin.impl.role.OpenflowOwnershipListener after RoleContexts are down
+        // TODO: must not be executed prior to all living RoleContexts have been closed (via closing living DeviceContexts)
         roleManager.close();
+
+        // Manually shutdown all remaining running threads in pool
+        threadPool.shutdown();
     }
 }