Bug 5596 Cleaning part 1
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / LifecycleConductorImpl.java
index 74c7f39d5422164f6d6ae66e08746da9bd8cb7d8..32b2d768faae2a0db2d2dca441c691cc8a7086d0 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowplugin.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -16,10 +17,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
@@ -32,6 +34,7 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChang
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
@@ -40,6 +43,8 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Messa
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
 import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,11 +63,13 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
     private StatisticsManager statisticsManager;
     private RpcManager rpcManager;
     private final MessageIntelligenceAgency messageIntelligenceAgency;
+    private final ConvertorExecutor convertorExecutor;
     private ConcurrentHashMap<DeviceInfo, ServiceChangeListener> serviceChangeListeners = new ConcurrentHashMap<>();
     private NotificationPublishService notificationPublishService;
 
-    LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) {
+    LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency, ConvertorExecutor convertorExecutor) {
         this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency);
+        this.convertorExecutor = convertorExecutor;
     }
 
     @Override
@@ -116,7 +123,7 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
         LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size());
         for (final Map.Entry<DeviceInfo, ServiceChangeListener> nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) {
             if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) {
-                LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success);
+                LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo.getNodeId().getValue(), success);
                 nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success);
                 serviceChangeListeners.remove(deviceInfo);
             }
@@ -126,10 +133,10 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
     @Override
     public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
         if (!success) {
-            LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo);
+            LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
             closeConnection(deviceInfo);
         } else {
-            LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo);
+            LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo.getNodeId().getValue());
         }
     }
 
@@ -137,71 +144,84 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
         LOG.debug("Close connection called for node {}", deviceInfo);
         final DeviceContext deviceContext = getDeviceContext(deviceInfo);
         if (null != deviceContext) {
-            deviceManager.notifyDeviceValidListeners(deviceInfo, false);
             deviceContext.shutdownConnection();
         }
     }
 
     @Override
-    public void roleChangeOnDevice(final DeviceInfo deviceInfo, final boolean success, final OfpRole newRole, final boolean initializationPhase) {
+    public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) {
 
-        final DeviceContext deviceContext = getDeviceContext(deviceInfo);
+        final DeviceContext deviceContext = Preconditions.checkNotNull(
+                deviceManager.gainContext(deviceInfo),
+                "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue()
+        );
 
-        if (null == deviceContext) {
-            LOG.warn("Something went wrong, device context for nodeId: {} doesn't exists");
-            return;
-        }
-        if (!success) {
-            LOG.warn("Role change to {} in role context for node {} was NOT successful, closing connection", newRole, deviceInfo);
-            closeConnection(deviceInfo);
-        } else {
-            if (initializationPhase) {
-                LOG.debug("Initialization phase skipping starting services.");
-                return;
-            }
+        final RpcContext rpcContext =  Preconditions.checkNotNull(
+                rpcManager.gainContext(deviceInfo),
+                "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue()
+        );
 
-            LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo);
+        LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo.getNodeId().getValue());
 
-            final String logText;
+        if (OfpRole.BECOMEMASTER.equals(newRole)) {
+            fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
+            MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider, convertorExecutor);
 
-            if (OfpRole.BECOMEMASTER.equals(newRole)) {
-                logText = "Start";
-                statisticsManager.startScheduling(deviceInfo);
-                MdSalRegistrationUtils.registerMasterServices(
-                        rpcManager.gainContext(deviceInfo),
+            if (rpcContext.isStatisticsRpcEnabled()) {
+                MdSalRegistrationUtils.registerStatCompatibilityServices(
+                        rpcContext,
                         deviceContext,
-                        OfpRole.BECOMEMASTER,
-                        this.extensionConverterProvider);
-                if (((RpcContext)rpcManager.gainContext(deviceInfo)).isStatisticsRpcEnabled()) {
-                    MdSalRegistrationUtils.registerStatCompatibilityServices(
-                            rpcManager.gainContext(deviceInfo),
-                            deviceManager.gainContext(deviceInfo),
-                            notificationPublishService,
-                            new AtomicLong());
-                }
-            } else {
-                logText = "Stopp";
-                statisticsManager.stopScheduling(deviceInfo);
-                MdSalRegistrationUtils.registerSlaveServices(
-                        rpcManager.gainContext(deviceInfo),
-                        OfpRole.BECOMESLAVE);
+                        notificationPublishService, convertorExecutor);
             }
+        } else {
+            statisticsManager.stopScheduling(deviceInfo);
 
-            final ListenableFuture<Void> onClusterRoleChange = deviceManager.onClusterRoleChange(deviceInfo, newRole);
-            Futures.addCallback(onClusterRoleChange, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(@Nullable final Void aVoid) {
-                    LOG.info("{}ing services for node {} was successful", logText, deviceInfo);
-                    if (newRole.equals(OfpRole.BECOMESLAVE)) notifyServiceChangeListeners(deviceInfo, true);
+            // Clean device flow registry if we became slave
+            if (OfpRole.BECOMESLAVE.equals(newRole)) {
+                deviceContext.getDeviceFlowRegistry().close();
+            }
+
+            MdSalRegistrationUtils.unregisterServices(rpcContext);
+        }
+
+    }
+
+    private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
+        // Fill device flow registry with flows from datastore
+        final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
+
+        // Start statistics scheduling only after we finished initializing device flow registry
+        Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
+            @Override
+            public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+                if (LOG.isDebugEnabled()) {
+                    // Count all flows we read from datastore for debugging purposes.
+                    // This number do not always represent how many flows were actually added
+                    // to DeviceFlowRegistry, because of possible duplicates.
+                    long flowCount = Optional.fromNullable(result).asSet().stream()
+                            .flatMap(Collection::stream)
+                            .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+                            .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+                            .flatMap(table -> table.getFlow().stream())
+                            .count();
+
+                    LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId().getValue());
                 }
 
-                @Override
-                public void onFailure(final Throwable throwable) {
-                    LOG.warn("{}ing services for node {} was NOT successful, closing connection", logText, deviceInfo);
-                    closeConnection(deviceInfo);
+                statisticsManager.startScheduling(deviceInfo);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                // If we manually cancelled this future, do not start scheduling of statistics
+                if (deviceFlowRegistryFill.isCancelled()) {
+                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId().getValue());
+                } else {
+                    LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId().getValue(), t);
+                    statisticsManager.startScheduling(deviceInfo);
                 }
-            });
-        }
+            }
+        });
     }
 
     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
@@ -235,20 +255,20 @@ final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeList
     @Override
     public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
         if (!success) {
-            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
+            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
             closeConnection(deviceInfo);
         } else {
-            LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo);
+            LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo.getNodeId().getValue());
         }
     }
 
     @Override
     public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
         if (!success) {
-            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
+            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
             closeConnection(deviceInfo);
         } else {
-            LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo);
+            LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo.getNodeId().getValue());
         }
     }