Merge "Bug 6110: Fixed bugs in statistics manager due to race condition." into stable...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / OpenFlowPluginProviderImpl.java
index ab64a090c49b745e31b2bd889fe93d98afbe428b..b3953549589aa50163d7d62a15e7d03ad6a3da17 100644 (file)
@@ -17,6 +17,10 @@ import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.MBeanRegistrationException;
@@ -29,13 +33,11 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginProvider;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
-import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
@@ -45,29 +47,40 @@ import org.opendaylight.openflowplugin.extension.api.OpenFlowPluginExtensionRegi
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterManager;
 import org.opendaylight.openflowplugin.impl.connection.ConnectionManagerImpl;
 import org.opendaylight.openflowplugin.impl.device.DeviceManagerImpl;
-import org.opendaylight.openflowplugin.impl.role.RoleManagerImpl;
 import org.opendaylight.openflowplugin.impl.rpc.RpcManagerImpl;
 import org.opendaylight.openflowplugin.impl.statistics.StatisticsManagerImpl;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyMXBean;
 import org.opendaylight.openflowplugin.impl.util.TranslatorLibraryUtil;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
 import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterManagerImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import io.netty.util.HashedWheelTimer;
 
 public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenFlowPluginExtensionRegistratorProvider {
 
     private static final Logger LOG = LoggerFactory.getLogger(OpenFlowPluginProviderImpl.class);
     private static final MessageIntelligenceAgency messageIntelligenceAgency = new MessageIntelligenceAgencyImpl();
+    private static final int TICKS_PER_WHEEL = 500;
+    // 0.5 sec.
+    private static final long TICK_DURATION = 10;
+    private static final Integer DEFAULT_BARRIER_COUNT = 25600;
+    private static final Long DEFAULT_ECHO_TIMEOUT = 2000L;
+    private static final Long DEFAULT_BARRIER_TIMEOUT = 500L;
+
+    private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
 
     private final int rpcRequestsQuota;
     private final long globalNotificationQuota;
+    private final ConvertorManager convertorManager;
     private long barrierInterval;
     private int barrierCountLimit;
     private long echoReplyTimeout;
     private DeviceManager deviceManager;
-    private RoleManager roleManager;
     private RpcManager rpcManager;
     private RpcProviderRegistry rpcProviderRegistry;
     private StatisticsManager statisticsManager;
@@ -75,32 +88,48 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
     private NotificationService notificationProviderService;
     private NotificationPublishService notificationPublishService;
     private EntityOwnershipService entityOwnershipService;
-
+    private ClusterSingletonServiceProvider singletonServicesProvider;
     private ExtensionConverterManager extensionConverterManager;
-
     private DataBroker dataBroker;
     private Collection<SwitchConnectionProvider> switchConnectionProviders;
     private boolean switchFeaturesMandatory = false;
-    private boolean isStatisticsPollingOff = false;
+    private boolean isStatisticsPollingOn = true;
     private boolean isStatisticsRpcEnabled;
-
-    private final LifecycleConductor conductor;
-
-    public OpenFlowPluginProviderImpl(final long rpcRequestsQuota, final Long globalNotificationQuota) {
+    private boolean isFlowRemovedNotificationOn = true;
+    private boolean skipTableFeatures = true;
+    private long basicTimerDelay;
+    private long maximumTimerDelay;
+
+    private final ThreadPoolExecutor threadPool;
+
+    public OpenFlowPluginProviderImpl(final long rpcRequestsQuota,
+                                      final long globalNotificationQuota,
+                                      final int threadPoolMinThreads,
+                                      final int threadPoolMaxThreads,
+                                      final long threadPoolTimeout) {
         Preconditions.checkArgument(rpcRequestsQuota > 0 && rpcRequestsQuota <= Integer.MAX_VALUE, "rpcRequestQuota has to be in range <1,%s>", Integer.MAX_VALUE);
         this.rpcRequestsQuota = (int) rpcRequestsQuota;
         this.globalNotificationQuota = Preconditions.checkNotNull(globalNotificationQuota);
-        conductor = new LifecycleConductorImpl(messageIntelligenceAgency);
+
+        // Creates a thread pool that creates new threads as needed, but will reuse previously
+        // constructed threads when they are available.
+        // Threads that have not been used for x seconds are terminated and removed from the cache.
+        threadPool = new ThreadPoolLoggingExecutor(
+                Preconditions.checkNotNull(threadPoolMinThreads),
+                Preconditions.checkNotNull(threadPoolMaxThreads),
+                Preconditions.checkNotNull(threadPoolTimeout), TimeUnit.SECONDS,
+                new SynchronousQueue<>(), "ofppool");
+        convertorManager = ConvertorManagerFactory.createDefaultManager();
     }
 
     @Override
-    public boolean isStatisticsPollingOff() {
-        return isStatisticsPollingOff;
+    public boolean isStatisticsPollingOn() {
+        return isStatisticsPollingOn;
     }
 
     @Override
-    public void setIsStatisticsPollingOff(final boolean isStatisticsPollingOff) {
-        this.isStatisticsPollingOff = isStatisticsPollingOff;
+    public void setStatisticsPollingOn(final boolean isStatisticsPollingOn) {
+        this.isStatisticsPollingOn = isStatisticsPollingOn;
     }
 
     private void startSwitchConnections() {
@@ -131,11 +160,6 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
         return switchFeaturesMandatory;
     }
 
-    @Override
-    public void setEntityOwnershipService(final EntityOwnershipService entityOwnershipService) {
-        this.entityOwnershipService = entityOwnershipService;
-    }
-
     @Override
     public void setBarrierCountLimit(final int barrierCountLimit) {
         this.barrierCountLimit = barrierCountLimit;
@@ -151,6 +175,35 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
         this.echoReplyTimeout = echoReplyTimeout;
     }
 
+    @Override
+    public void setFlowRemovedNotification(boolean isFlowRemovedNotificationOn) {
+        this.isFlowRemovedNotificationOn = this.isFlowRemovedNotificationOn;
+    }
+
+    @Override
+    public void setClusteringSingletonServicesProvider(ClusterSingletonServiceProvider singletonServicesProvider) {
+        this.singletonServicesProvider = singletonServicesProvider;
+    }
+
+    @Override
+    public void setEntityOwnershipServiceProvider(EntityOwnershipService entityOwnershipService) {
+        this.entityOwnershipService = entityOwnershipService;
+    }
+
+    @Override
+    public void setSkipTableFeatures(final boolean skipTableFeatures){
+            this.skipTableFeatures = skipTableFeatures;
+    }
+
+    @Override
+    public void setBasicTimerDelay(long basicTimerDelay) {
+        this.basicTimerDelay = basicTimerDelay;
+    }
+
+    @Override
+    public void setMaximumTimerDelay(long maximumTimerDelay) {
+        this.maximumTimerDelay = maximumTimerDelay;
+    }
 
     @Override
     public void setSwitchFeaturesMandatory(final boolean switchFeaturesMandatory) {
@@ -178,17 +231,17 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
 
     @Override
     public void initialize() {
-
         Preconditions.checkNotNull(dataBroker, "missing data broker");
         Preconditions.checkNotNull(rpcProviderRegistry, "missing RPC provider registry");
         Preconditions.checkNotNull(notificationProviderService, "missing notification provider service");
+        Preconditions.checkNotNull(singletonServicesProvider, "missing singleton services provider");
 
         extensionConverterManager = new ExtensionConverterManagerImpl();
         // TODO: copied from OpenFlowPluginProvider (Helium) misusesing the old way of distributing extension converters
         // TODO: rewrite later!
         OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterManager);
 
-        connectionManager = new ConnectionManagerImpl(echoReplyTimeout);
+        connectionManager = new ConnectionManagerImpl(echoReplyTimeout, threadPool);
 
         registerMXBean(messageIntelligenceAgency);
 
@@ -197,41 +250,94 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
                 switchFeaturesMandatory,
                 barrierInterval,
                 barrierCountLimit,
-                conductor);
-        ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
-
-        conductor.setSafelyDeviceManager(deviceManager);
+                getMessageIntelligenceAgency(),
+                isFlowRemovedNotificationOn,
+                singletonServicesProvider,
+                entityOwnershipService,
+                hashedWheelTimer,
+                convertorManager,
+                skipTableFeatures,
+                notificationPublishService);
 
-        roleManager = new RoleManagerImpl(entityOwnershipService, dataBroker, conductor);
-        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOff, conductor);
-        conductor.setSafelyStatisticsManager(statisticsManager);
-        rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, conductor);
+        ((ExtensionConverterProviderKeeper) deviceManager).setExtensionConverterProvider(extensionConverterManager);
 
-        roleManager.addRoleChangeListener((RoleChangeListener) conductor);
+        rpcManager = new RpcManagerImpl(rpcProviderRegistry, rpcRequestsQuota, extensionConverterManager, convertorManager, notificationPublishService);
+        statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOn, hashedWheelTimer,
+                convertorManager,basicTimerDelay,maximumTimerDelay);
 
         /* Initialization Phase ordering - OFP Device Context suite */
         // CM -> DM -> SM -> RPC -> Role -> DM
         connectionManager.setDeviceConnectedHandler(deviceManager);
         deviceManager.setDeviceInitializationPhaseHandler(statisticsManager);
         statisticsManager.setDeviceInitializationPhaseHandler(rpcManager);
-        rpcManager.setDeviceInitializationPhaseHandler(roleManager);
-        roleManager.setDeviceInitializationPhaseHandler(deviceManager);
+        rpcManager.setDeviceInitializationPhaseHandler(deviceManager);
 
         /* Termination Phase ordering - OFP Device Context suite */
         deviceManager.setDeviceTerminationPhaseHandler(rpcManager);
         rpcManager.setDeviceTerminationPhaseHandler(statisticsManager);
-        statisticsManager.setDeviceTerminationPhaseHandler(roleManager);
-        roleManager.setDeviceTerminationPhaseHandler(deviceManager);
+        statisticsManager.setDeviceTerminationPhaseHandler(deviceManager);
 
-        deviceManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
-        deviceManager.setNotificationPublishService(notificationPublishService);
+        rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
 
-        TranslatorLibraryUtil.setBasicTranslatorLibrary(deviceManager);
+        TranslatorLibraryUtil.injectBasicTranslatorLibrary(deviceManager, convertorManager);
         deviceManager.initialize();
 
         startSwitchConnections();
     }
 
+    @Override
+    public void update(Map<String,Object> props) {
+        LOG.debug("Update managed properties = {}", props.toString());
+
+        if(deviceManager != null) {
+            if (props.containsKey("notification-flow-removed-off")) {
+                deviceManager.setFlowRemovedNotificationOn(Boolean.valueOf(props.get("enable-flow-removed-notification").toString()));
+            }
+            if (props.containsKey("skip-table-features")) {
+                deviceManager.setSkipTableFeatures(Boolean.valueOf(props.get("skip-table-features").toString()));
+            }
+            if (props.containsKey("barrier-count-limit")) {
+                try {
+                    deviceManager.setBarrierCountLimit(Integer.valueOf(props.get("barrier-count-limit").toString()));
+                } catch (NumberFormatException ex) {
+                    deviceManager.setBarrierCountLimit(DEFAULT_BARRIER_COUNT);
+                }
+            }
+            if (props.containsKey("barrier-interval-timeout-limit")){
+                try {
+                    deviceManager.setBarrierInterval(Long.valueOf(props.get("barrier-interval-timeout-limit").toString()));
+                } catch (NumberFormatException ex) {
+                    deviceManager.setBarrierInterval(DEFAULT_BARRIER_TIMEOUT);
+                }
+            }
+        }
+
+        if(rpcManager != null && props.containsKey("is-statistics-rpc-enabled")){
+            rpcManager.setStatisticsRpcEnabled(Boolean.valueOf((props.get("is-statistics-rpc-enabled").toString())));
+        }
+
+        if (connectionManager != null && props.containsKey("echo-reply-timeout") ){
+            try {
+                connectionManager.setEchoReplyTimeout(Long.valueOf(props.get("echo-reply-timeout").toString()));
+            }catch (NumberFormatException ex){
+                connectionManager.setEchoReplyTimeout(DEFAULT_ECHO_TIMEOUT);
+            }
+        }
+
+        if(statisticsManager != null && props.containsKey("is-statistics-polling-on")){
+            statisticsManager.setIsStatisticsPollingOn(Boolean.valueOf(props.get("is-statistics-polling-on").toString()));
+        }
+
+        if(statisticsManager != null && props.containsKey("basic-timer-delay")){
+            statisticsManager.setBasicTimerDelay(Long.valueOf(props.get("basic-timer-delay").toString()));
+        }
+
+        if(statisticsManager != null && props.containsKey("maximum-timer-delay")){
+            statisticsManager.setMaximumTimerDelay(Long.valueOf(props.get("maximum-timer-delay").toString()));
+        }
+    }
+
+
     private static void registerMXBean(final MessageIntelligenceAgency messageIntelligenceAgency) {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try {
@@ -275,8 +381,7 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
         rpcManager.close();
         statisticsManager.close();
 
-        // TODO: needs to close org.opendaylight.openflowplugin.impl.role.OpenflowOwnershipListener after RoleContexts are down
-        // TODO: must not be executed prior to all living RoleContexts have been closed (via closing living DeviceContexts)
-        roleManager.close();
+        // Manually shutdown all remaining running threads in pool
+        threadPool.shutdown();
     }
-}
+}
\ No newline at end of file