Bug 5596 Cleaning part 1
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / LifecycleConductorImpl.java
index 931249708949ccf9a2baf21b995c8c600cbfd101..32b2d768faae2a0db2d2dca441c691cc8a7086d0 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowplugin.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -16,11 +17,15 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.openflowplugin.api.openflow.OFPManager;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
@@ -29,43 +34,78 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChang
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
+import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
+import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  */
-public final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener {
+final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper {
 
     private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class);
     private static final int TICKS_PER_WHEEL = 500;
     private static final long TICK_DURATION = 10; // 0.5 sec.
 
     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
+    private ExtensionConverterProvider extensionConverterProvider;
     private DeviceManager deviceManager;
+    private StatisticsManager statisticsManager;
+    private RpcManager rpcManager;
     private final MessageIntelligenceAgency messageIntelligenceAgency;
+    private final ConvertorExecutor convertorExecutor;
     private ConcurrentHashMap<DeviceInfo, ServiceChangeListener> serviceChangeListeners = new ConcurrentHashMap<>();
-    private StatisticsManager statisticsManager;
+    private NotificationPublishService notificationPublishService;
 
-    LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) {
-        Preconditions.checkNotNull(messageIntelligenceAgency);
-        this.messageIntelligenceAgency = messageIntelligenceAgency;
+    LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency, ConvertorExecutor convertorExecutor) {
+        this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency);
+        this.convertorExecutor = convertorExecutor;
     }
 
     @Override
-    public void setSafelyDeviceManager(final DeviceManager deviceManager) {
-        if (this.deviceManager == null) {
-            this.deviceManager = deviceManager;
-        }
+    public ExtensionConverterProvider getExtensionConverterProvider() {
+        return extensionConverterProvider;
+    }
+
+    @Override
+    public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+        this.extensionConverterProvider = extensionConverterProvider;
     }
 
     @Override
-    public void setSafelyStatisticsManager(final StatisticsManager statisticsManager) {
-        if (this.statisticsManager == null) {
-            this.statisticsManager = statisticsManager;
+    public void setSafelyManager(final OFPManager manager){
+        if (manager instanceof RpcManager) {
+            if (rpcManager != null) {
+                LOG.info("RPC manager {} is already defined in conductor. ", manager);
+                return;
+            }
+            this.rpcManager = (RpcManager) manager;
+        } else {
+            if (manager instanceof StatisticsManager) {
+                if (statisticsManager != null) {
+                    LOG.info("Statistics manager {} is already defined in conductor. ", manager);
+                    return;
+                }
+                this.statisticsManager = (StatisticsManager) manager;
+            } else {
+                if (manager instanceof DeviceManager) {
+                    if (deviceManager != null) {
+                        LOG.info("Device manager {} is already defined in conductor. ", manager);
+                        return;
+                    }
+                    this.deviceManager = (DeviceManager) manager;
+                }
+            }
         }
     }
 
@@ -83,7 +123,7 @@ public final class LifecycleConductorImpl implements LifecycleConductor, RoleCha
         LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size());
         for (final Map.Entry<DeviceInfo, ServiceChangeListener> nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) {
             if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) {
-                LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success);
+                LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo.getNodeId().getValue(), success);
                 nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success);
                 serviceChangeListeners.remove(deviceInfo);
             }
@@ -93,10 +133,10 @@ public final class LifecycleConductorImpl implements LifecycleConductor, RoleCha
     @Override
     public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
         if (!success) {
-            LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo);
+            LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
             closeConnection(deviceInfo);
         } else {
-            LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo);
+            LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo.getNodeId().getValue());
         }
     }
 
@@ -109,46 +149,79 @@ public final class LifecycleConductorImpl implements LifecycleConductor, RoleCha
     }
 
     @Override
-    public void roleChangeOnDevice(final DeviceInfo deviceInfo, final boolean success, final OfpRole newRole, final boolean initializationPhase) {
+    public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) {
 
-        final DeviceContext deviceContext = getDeviceContext(deviceInfo);
+        final DeviceContext deviceContext = Preconditions.checkNotNull(
+                deviceManager.gainContext(deviceInfo),
+                "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue()
+        );
 
-        if (null == deviceContext) {
-            LOG.warn("Something went wrong, device context for nodeId: {} doesn't exists");
-            return;
-        }
-        if (!success) {
-            LOG.warn("Role change to {} in role context for node {} was NOT successful, closing connection", newRole, deviceInfo);
-            closeConnection(deviceInfo);
+        final RpcContext rpcContext =  Preconditions.checkNotNull(
+                rpcManager.gainContext(deviceInfo),
+                "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId().getValue()
+        );
+
+        LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo.getNodeId().getValue());
+
+        if (OfpRole.BECOMEMASTER.equals(newRole)) {
+            fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
+            MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider, convertorExecutor);
+
+            if (rpcContext.isStatisticsRpcEnabled()) {
+                MdSalRegistrationUtils.registerStatCompatibilityServices(
+                        rpcContext,
+                        deviceContext,
+                        notificationPublishService, convertorExecutor);
+            }
         } else {
-            if (initializationPhase) {
-                LOG.debug("Initialization phase skipping starting services.");
-                return;
+            statisticsManager.stopScheduling(deviceInfo);
+
+            // Clean device flow registry if we became slave
+            if (OfpRole.BECOMESLAVE.equals(newRole)) {
+                deviceContext.getDeviceFlowRegistry().close();
             }
 
-            LOG.info("Role change to {} in role context for node {} was successful, starting/stopping services.", newRole, deviceInfo);
+            MdSalRegistrationUtils.unregisterServices(rpcContext);
+        }
+
+    }
 
-            if (OfpRole.BECOMEMASTER.equals(newRole)) {
-                statisticsManager.startScheduling(deviceContext.getPrimaryConnectionContext().getDeviceInfo());
-            } else {
-                statisticsManager.stopScheduling(deviceContext.getPrimaryConnectionContext().getDeviceInfo());
-            }
+    private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
+        // Fill device flow registry with flows from datastore
+        final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
+
+        // Start statistics scheduling only after we finished initializing device flow registry
+        Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
+            @Override
+            public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+                if (LOG.isDebugEnabled()) {
+                    // Count all flows we read from datastore for debugging purposes.
+                    // This number do not always represent how many flows were actually added
+                    // to DeviceFlowRegistry, because of possible duplicates.
+                    long flowCount = Optional.fromNullable(result).asSet().stream()
+                            .flatMap(Collection::stream)
+                            .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+                            .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+                            .flatMap(table -> table.getFlow().stream())
+                            .count();
 
-            final ListenableFuture<Void> onClusterRoleChange = deviceContext.onClusterRoleChange(null, newRole);
-            Futures.addCallback(onClusterRoleChange, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(@Nullable final Void aVoid) {
-                    LOG.info("Starting/Stopping services for node {} was successful", deviceInfo);
-                    if (newRole.equals(OfpRole.BECOMESLAVE)) notifyServiceChangeListeners(deviceInfo, true);
+                    LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId().getValue());
                 }
 
-                @Override
-                public void onFailure(final Throwable throwable) {
-                    LOG.warn("Starting/Stopping services for node {} was NOT successful, closing connection", deviceInfo);
-                    closeConnection(deviceInfo);
+                statisticsManager.startScheduling(deviceInfo);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                // If we manually cancelled this future, do not start scheduling of statistics
+                if (deviceFlowRegistryFill.isCancelled()) {
+                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId().getValue());
+                } else {
+                    LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId().getValue(), t);
+                    statisticsManager.startScheduling(deviceInfo);
                 }
-            });
-        }
+            }
+        });
     }
 
     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
@@ -157,19 +230,20 @@ public final class LifecycleConductorImpl implements LifecycleConductor, RoleCha
 
     @Override
     public DeviceContext getDeviceContext(DeviceInfo deviceInfo){
-         return deviceManager.getDeviceContextFromNodeId(deviceInfo);
+         return deviceManager.gainContext(deviceInfo);
     }
 
     @Override
-    public Short gainVersionSafely(final DeviceInfo deviceInfo) {
-        return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getFeatures().getVersion() : null;
+    public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){
+        return statisticsManager.gainContext(deviceInfo);
     }
 
     public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) {
         return hashedWheelTimer.newTimeout(task, delay, unit);
     }
 
-    ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){
+    @Override
+    public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){
         return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null;
     }
 
@@ -181,26 +255,35 @@ public final class LifecycleConductorImpl implements LifecycleConductor, RoleCha
     @Override
     public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
         if (!success) {
-            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
+            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
             closeConnection(deviceInfo);
         } else {
-            LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo);
+            LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo.getNodeId().getValue());
         }
     }
 
     @Override
     public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
         if (!success) {
-            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
+            LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo.getNodeId().getValue());
             closeConnection(deviceInfo);
         } else {
-            LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo);
+            LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo.getNodeId().getValue());
         }
     }
 
     @VisibleForTesting
-    public boolean isServiceChangeListenersEmpty() {
+    boolean isServiceChangeListenersEmpty() {
         return this.serviceChangeListeners.isEmpty();
     }
 
+    @Override
+    public NotificationPublishService getNotificationPublishService() {
+        return notificationPublishService;
+    }
+
+    @Override
+    public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
+        this.notificationPublishService = notificationPublishService;
+    }
 }