Merge "Enabling Remote RPC Router module in ODL distribution."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
index 873b53ff65aa23ececb17d733d64fb438f3f6098..dacc130831ff18332adff071d400621096e8b48b 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -25,37 +27,37 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
+import org.opendaylight.controller.protocol_plugin.openflow.IDiscoveryListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.Edge;
+import org.opendaylight.controller.sal.core.IContainerAware;
 import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.State;
 import org.opendaylight.controller.sal.core.UpdateType;
-import org.opendaylight.controller.sal.discovery.IDiscoveryService;
 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The class describes a shim layer that relays the topology events from
  * OpenFlow core to various listeners. The notifications are filtered based on
  * container configurations.
  */
-public class TopologyServiceShim implements IDiscoveryService,
+public class TopologyServiceShim implements IDiscoveryListener,
         IContainerListener, CommandProvider, IRefreshInternalProvider,
-        IInventoryShimExternalListener {
+        IInventoryShimExternalListener, IContainerAware {
     protected static final Logger logger = LoggerFactory
             .getLogger(TopologyServiceShim.class);
     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
@@ -108,6 +110,7 @@ public class TopologyServiceShim implements IDiscoveryService,
             this.notifyQ = notifyQ;
         }
 
+        @Override
         public void run() {
             while (true) {
                 try {
@@ -124,12 +127,15 @@ public class TopologyServiceShim implements IDiscoveryService,
                         teuMap.put(entry.container, teuList);
                         notifyListeners = true;
                     }
-                    
+
                     if (notifyListeners) {
                         for (String container : teuMap.keySet()) {
                             // notify the listener
-                            topologyServiceShimListeners.get(container)
-                                    .edgeUpdate(teuMap.get(container));
+                            ITopologyServiceShimListener l = topologyServiceShimListeners.get(container);
+                            // container topology service may not have come up yet
+                            if (l != null) {
+                                l.edgeUpdate(teuMap.get(container));
+                            }
                         }
                     }
 
@@ -164,6 +170,7 @@ public class TopologyServiceShim implements IDiscoveryService,
             this.notifyQ = notifyQ;
         }
 
+        @Override
         public void run() {
             while (true) {
                 try {
@@ -173,16 +180,22 @@ public class TopologyServiceShim implements IDiscoveryService,
                     for (String container : containerList) {
                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
                                 .get(container);
-                        Edge edge = edgePropsMap.get(connector).getLeft();
-                        if (edge.getTailNodeConnector().equals(connector)) {
-                            ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
-                                    .get(container);
-                            if (update.type == UpdateType.ADDED) {
-                                topologServiceShimListener
-                                        .edgeOverUtilized(edge);
-                            } else {
-                                topologServiceShimListener
-                                        .edgeUtilBackToNormal(edge);
+                        // the edgePropsMap for a particular container may not have
+                        // the connector.
+                        // so check for null
+                        Pair<Edge, Set<Property>> edgeProp = edgePropsMap.get(connector);
+                        if(edgeProp != null) {
+                            Edge edge = edgeProp.getLeft();
+                            if (edge.getTailNodeConnector().equals(connector)) {
+                                ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
+                                        .get(container);
+                                if (update.type == UpdateType.ADDED) {
+                                    topologServiceShimListener
+                                    .edgeOverUtilized(edge);
+                                } else {
+                                    topologServiceShimListener
+                                    .edgeUtilBackToNormal(edge);
+                                }
                             }
                         }
                     }
@@ -203,7 +216,7 @@ public class TopologyServiceShim implements IDiscoveryService,
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
-     * 
+     *
      */
     void init() {
         logger.trace("Init called");
@@ -291,18 +304,18 @@ public class TopologyServiceShim implements IDiscoveryService,
             // Compare bandwidth usage
             Long switchId = (Long) connector.getNode().getID();
             Short port = (Short) connector.getID();
-            float rate = statsMgr.getTransmitRate(switchId, port);
-            if (rate > bwThresholdFactor * bw) {
-                if (!connectorsOverUtilized.contains(connector)) {
-                    connectorsOverUtilized.add(connector);
-                    this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
-                            UpdateType.ADDED));
-                }
-            } else {
-                if (connectorsOverUtilized.contains(connector)) {
-                    connectorsOverUtilized.remove(connector);
-                    this.bwUtilNotifyQ.add(new UtilizationUpdate(connector,
-                            UpdateType.REMOVED));
+            if (statsMgr != null) {
+                float rate = statsMgr.getTransmitRate(switchId, port);
+                if (rate > bwThresholdFactor * bw) {
+                    if (!connectorsOverUtilized.contains(connector)) {
+                        connectorsOverUtilized.add(connector);
+                        this.bwUtilNotifyQ.add(new UtilizationUpdate(connector, UpdateType.ADDED));
+                    }
+                } else {
+                    if (connectorsOverUtilized.contains(connector)) {
+                        connectorsOverUtilized.remove(connector);
+                        this.bwUtilNotifyQ.add(new UtilizationUpdate(connector, UpdateType.REMOVED));
+                    }
                 }
             }
         }
@@ -313,7 +326,7 @@ public class TopologyServiceShim implements IDiscoveryService,
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
      * example bundle is being stopped.
-     * 
+     *
      */
     void destroy() {
         logger.trace("DESTROY called!");
@@ -324,7 +337,7 @@ public class TopologyServiceShim implements IDiscoveryService,
     /**
      * Function called by dependency manager after "init ()" is called and after
      * the services provided by the class are registered in the service registry
-     * 
+     *
      */
     void start() {
         logger.trace("START called!");
@@ -338,7 +351,7 @@ public class TopologyServiceShim implements IDiscoveryService,
      * Function called by the dependency manager before the services exported by
      * the component are unregistered, this will be followed by a "destroy ()"
      * calls
-     * 
+     *
      */
     void stop() {
         logger.trace("STOP called!");
@@ -398,6 +411,92 @@ public class TopologyServiceShim implements IDiscoveryService,
         }
     }
 
+    private void updateContainerMap(List<String> containers, NodeConnector p) {
+        if (containers.isEmpty()) {
+            // Do cleanup to reduce memory footprint if no
+            // elements to be tracked
+            this.containerMap.remove(p);
+        } else {
+            this.containerMap.put(p, containers);
+        }
+    }
+
+    /**
+     * From a given edge map, retrieve the edge sourced by the port and update
+     * the local cache in the container
+     *
+     * @param container
+     *            the container name
+     * @param nodeConnector
+     *            the node connector
+     * @param edges
+     *            the given edge map
+     * @return the found edge
+     */
+    private Edge addEdge(String container, NodeConnector nodeConnector,
+            Map<NodeConnector, Pair<Edge, Set<Property>>> edges) {
+        logger.debug("Search edge sourced by port {} in container {}", nodeConnector, container);
+
+        // Retrieve the associated edge
+        Pair<Edge, Set<Property>> edgeProps = edges.get(nodeConnector);
+        if (edgeProps == null) {
+            logger.debug("edgePros is null for port {} in container {}", nodeConnector, container);
+            return null;
+        }
+
+        Edge edge = edgeProps.getLeft();
+        if (edge == null) {
+            logger.debug("edge is null for port {} in container {}", nodeConnector, container);
+            return null;
+        }
+
+        // Make sure the peer port is in the same container
+        NodeConnector peerConnector = edge.getHeadNodeConnector();
+        List<String> containers = this.containerMap.get(peerConnector);
+        if ((containers == null) || !containers.contains(container)) {
+            logger.debug("peer port {} of edge {} is not part of the container {}", new Object[] { peerConnector, edge,
+                    container });
+            return null;
+        }
+
+        // Update the local cache
+        updateLocalEdgeMap(container, edge, UpdateType.ADDED, edgeProps.getRight());
+        logger.debug("Added edge {} to local cache in container {}", edge, container);
+
+        return edge;
+    }
+
+    private void addNodeConnector(String container,
+            NodeConnector nodeConnector) {
+        // Use the global edge map for the newly added port in a container
+        Map<NodeConnector, Pair<Edge, Set<Property>>> globalEdgeMap = edgeMap.get(GlobalConstants.DEFAULT
+                .toString());
+        if (globalEdgeMap == null) {
+            return;
+        }
+
+        // Get the edge and update local cache in the container
+        Edge edge1, edge2;
+        edge1 = addEdge(container, nodeConnector, globalEdgeMap);
+        if (edge1 == null) {
+            return;
+        }
+
+        // Get the edge in reverse direction and update local cache in the container
+        NodeConnector peerConnector = edge1.getHeadNodeConnector();
+        edge2 = addEdge(container, peerConnector, globalEdgeMap);
+
+        // Send notification upwards in one shot
+        List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+        teuList.add(new TopoEdgeUpdate(edge1, null, UpdateType.ADDED));
+        logger.debug("Notify edge1: {} in container {}", edge1, container);
+        if (edge2 != null) {
+            teuList.add(new TopoEdgeUpdate(edge2, null, UpdateType.ADDED));
+            logger.debug("Notify edge2: {} in container {}", edge2, container);
+        }
+        notifyEdge(container, teuList);
+    }
+
     private void removeNodeConnector(String container,
             NodeConnector nodeConnector) {
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
@@ -431,7 +530,7 @@ public class TopologyServiceShim implements IDiscoveryService,
     /**
      * Update local cache and return true if it needs to notify upper layer
      * Topology listeners.
-     * 
+     *
      * @param container
      *            The network container
      * @param edge
@@ -493,14 +592,14 @@ public class TopologyServiceShim implements IDiscoveryService,
                     "notifyLocalEdgeMap: {} for Edge {} in container {}",
                     new Object[] { type.getName(), edge, container });
         }
-        
+
         return rv;
     }
 
     private void notifyEdge(String container, Edge edge, UpdateType type,
             Set<Property> props) {
         boolean notifyListeners;
-        
+
         // Update local cache
         notifyListeners = updateLocalEdgeMap(container, edge, type, props);
 
@@ -508,7 +607,7 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (notifyListeners) {
             notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
                     type)));
-            logger.debug("notifyEdge: {} Edge {} in container {}", 
+            logger.debug("notifyEdge: {} Edge {} in container {}",
                     new Object[] { type.getName(), edge, container });
         }
     }
@@ -611,33 +710,24 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (containers == null) {
             containers = new CopyOnWriteArrayList<String>();
         }
-        boolean updateMap = false;
         switch (t) {
         case ADDED:
             if (!containers.contains(containerName)) {
                 containers.add(containerName);
-                updateMap = true;
+                updateContainerMap(containers, p);
+                addNodeConnector(containerName, p);
             }
             break;
         case REMOVED:
             if (containers.contains(containerName)) {
                 containers.remove(containerName);
-                updateMap = true;
+                updateContainerMap(containers, p);
                 removeNodeConnector(containerName, p);
             }
             break;
         case CHANGED:
             break;
         }
-        if (updateMap) {
-            if (containers.isEmpty()) {
-                // Do cleanup to reduce memory footprint if no
-                // elements to be tracked
-                this.containerMap.remove(p);
-            } else {
-                this.containerMap.put(p, containers);
-            }
-        }
     }
 
     @Override
@@ -712,11 +802,11 @@ public class TopologyServiceShim implements IDiscoveryService,
      * pushes the updates to ALL the applications that have registered as
      * listeners for this service. SAL has no way of knowing which application
      * requested for the refresh.
-     * 
+     *
      * As an example of this case, is stopping and starting the Topology
      * Manager. When the topology Manager is stopped, and restarted, it will no
      * longer have the latest topology. Hence, a request is sent here.
-     * 
+     *
      * @param containerName
      * @return void
      */
@@ -727,19 +817,36 @@ public class TopologyServiceShim implements IDiscoveryService,
         bulkNotifyQ.add(containerName);
     }
 
+    /**
+     * Retrieve the edges for a given container
+     *
+     * @param containerName
+     *            the container name
+     * @return the edges and their properties
+     */
+    private Collection<Pair<Edge, Set<Property>>> getEdgeProps(String containerName) {
+        Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
+        edgePropMap = edgeMap.get(containerName);
+        if (edgePropMap == null) {
+            return null;
+        }
+        return edgePropMap.values();
+    }
+
     /**
      * Reading the current topology database, the method will replay all the
      * edge updates for the ITopologyServiceShimListener instance in the given
      * container, which will in turn publish them toward SAL.
-     * 
+     *
      * @param containerName
+     *            the container name
      */
     private void TopologyBulkUpdate(String containerName) {
-        Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropMap = null;
+        Collection<Pair<Edge, Set<Property>>> edgeProps = null;
 
         logger.debug("Try bulk update for container:{}", containerName);
-        edgePropMap = edgeMap.get(containerName);
-        if (edgePropMap == null) {
+        edgeProps = getEdgeProps(containerName);
+        if (edgeProps == null) {
             logger.debug("No edges known for container:{}", containerName);
             return;
         }
@@ -752,12 +859,12 @@ public class TopologyServiceShim implements IDiscoveryService,
         }
         int i = 0;
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
-        for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
-            if (edgeProps != null) {
+        for (Pair<Edge, Set<Property>> edgeProp : edgeProps) {
+            if (edgeProp != null) {
                 i++;
-                teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
+                teuList.add(new TopoEdgeUpdate(edgeProp.getLeft(), edgeProp
                         .getRight(), UpdateType.ADDED));
-                logger.trace("Add edge {}", edgeProps.getLeft());
+                logger.trace("Add edge {}", edgeProp.getLeft());
             }
         }
         if (i > 0) {
@@ -818,4 +925,29 @@ public class TopologyServiceShim implements IDiscoveryService,
             break;
         }
     }
+
+    @Override
+    public void containerCreate(String containerName) {
+        // do nothing
+    }
+
+    @Override
+    public void containerDestroy(String containerName) {
+        Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
+        for (Map.Entry<NodeConnector, List<String>> entry : containerMap.entrySet()) {
+            List<String> ncContainers = entry.getValue();
+            if (ncContainers.contains(containerName)) {
+                NodeConnector nodeConnector = entry.getKey();
+                removeNodeConnectorSet.add(nodeConnector);
+            }
+        }
+        for (NodeConnector nodeConnector : removeNodeConnectorSet) {
+            List<String> ncContainers = containerMap.get(nodeConnector);
+            ncContainers.remove(containerName);
+            if (ncContainers.isEmpty()) {
+                containerMap.remove(nodeConnector);
+            }
+        }
+        edgeMap.remove(containerName);
+    }
 }