As part of performance enhancements, Topology updates are now batched as much as...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / TopologyServiceShim.java
index af502a2177d12c89101ca5b2363f90a9eeabe4da..873b53ff65aa23ececb17d733d64fb438f3f6098 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +45,7 @@ import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.State;
 import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
 
 /**
@@ -52,8 +54,8 @@ import org.opendaylight.controller.sal.utils.GlobalConstants;
  * container configurations.
  */
 public class TopologyServiceShim implements IDiscoveryService,
-               IContainerListener, CommandProvider, IRefreshInternalProvider,
-               IInventoryShimExternalListener {
+        IContainerListener, CommandProvider, IRefreshInternalProvider,
+        IInventoryShimExternalListener {
     protected static final Logger logger = LoggerFactory
             .getLogger(TopologyServiceShim.class);
     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
@@ -76,19 +78,31 @@ public class TopologyServiceShim implements IDiscoveryService,
 
     class NotifyEntry {
         String container;
-        Pair<Edge, Set<Property>> edgeProps;
-        UpdateType type;
+        List<TopoEdgeUpdate> teuList;
 
-        NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
-                UpdateType type) {
+        public NotifyEntry(String container, TopoEdgeUpdate teu) {
             this.container = container;
-            this.edgeProps = edgeProps;
-            this.type = type;
+            this.teuList = new ArrayList<TopoEdgeUpdate>();
+            if (teu != null) {
+                this.teuList.add(teu);
+            }
+        }
+
+        public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
+            this.container = container;
+            this.teuList = new ArrayList<TopoEdgeUpdate>();
+            if (teuList != null) {
+                this.teuList.addAll(teuList);
+            }
         }
     }
 
     class TopologyNotify implements Runnable {
         private final BlockingQueue<NotifyEntry> notifyQ;
+        private NotifyEntry entry;
+        private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
+        private List<TopoEdgeUpdate> teuList;
+        private boolean notifyListeners;
 
         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
             this.notifyQ = notifyQ;
@@ -97,22 +111,37 @@ public class TopologyServiceShim implements IDiscoveryService,
         public void run() {
             while (true) {
                 try {
-                    NotifyEntry entry = notifyQ.take();
-
-                    ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
-                            .get(entry.container);
-                    topologServiceShimListener.edgeUpdate(
-                            entry.edgeProps.getLeft(), entry.type,
-                            entry.edgeProps.getRight());
+                    teuMap.clear();
+                    notifyListeners = false;
+                    while (!notifyQ.isEmpty()) {
+                        entry = notifyQ.take();
+                        teuList = teuMap.get(entry.container);
+                        if (teuList == null) {
+                            teuList = new ArrayList<TopoEdgeUpdate>();
+                        }
+                        // group all the updates together
+                        teuList.addAll(entry.teuList);
+                        teuMap.put(entry.container, teuList);
+                        notifyListeners = true;
+                    }
+                    
+                    if (notifyListeners) {
+                        for (String container : teuMap.keySet()) {
+                            // notify the listener
+                            topologyServiceShimListeners.get(container)
+                                    .edgeUpdate(teuMap.get(container));
+                        }
+                    }
 
-                    entry = null;
+                    Thread.sleep(100);
                 } catch (InterruptedException e1) {
-                    logger.warn("TopologyNotify interrupted {}", e1.getMessage());
+                    logger.warn("TopologyNotify interrupted {}",
+                            e1.getMessage());
                     if (shuttingDown) {
                         return;
                     }
                 } catch (Exception e2) {
-                    logger.error("",e2);
+                    logger.error("", e2);
                 }
             }
         }
@@ -165,7 +194,7 @@ public class TopologyServiceShim implements IDiscoveryService,
                         return;
                     }
                 } catch (Exception e2) {
-                    logger.error("",e2);
+                    logger.error("", e2);
                 }
             }
         }
@@ -353,7 +382,8 @@ public class TopologyServiceShim implements IDiscoveryService,
                 && this.topologyServiceShimListeners.get(containerName).equals(
                         s)) {
             this.topologyServiceShimListeners.remove(containerName);
-            logger.trace("Removed topologyServiceShimListener for container: {}",
+            logger.trace(
+                    "Removed topologyServiceShimListener for container: {}",
                     containerName);
         }
     }
@@ -370,6 +400,7 @@ public class TopologyServiceShim implements IDiscoveryService,
 
     private void removeNodeConnector(String container,
             NodeConnector nodeConnector) {
+        List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
                 .get(container);
         if (edgePropsMap == null) {
@@ -381,7 +412,8 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (edgeProps == null) {
             return;
         }
-        notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+        teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+                UpdateType.REMOVED));
 
         // Remove edge in another direction
         edgeProps = edgePropsMap
@@ -389,52 +421,130 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (edgeProps == null) {
             return;
         }
-        notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+        teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+                UpdateType.REMOVED));
+
+        // Update in one shot
+        notifyEdge(container, teuList);
     }
 
-    private void notifyEdge(String container, Edge edge, UpdateType type,
-            Set<Property> props) {
+    /**
+     * Update local cache and return true if it needs to notify upper layer
+     * Topology listeners.
+     * 
+     * @param container
+     *            The network container
+     * @param edge
+     *            The edge
+     * @param type
+     *            The update type
+     * @param props
+     *            The edge properties
+     * @return true if it needs to notify upper layer Topology listeners
+     */
+    private boolean updateLocalEdgeMap(String container, Edge edge,
+            UpdateType type, Set<Property> props) {
         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
                 .get(container);
         NodeConnector src = edge.getTailNodeConnector();
         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
                 edge, props);
+        boolean rv = false;
 
         switch (type) {
         case ADDED:
         case CHANGED:
             if (edgePropsMap == null) {
                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
+                rv = true;
             } else {
                 if (edgePropsMap.containsKey(src)
                         && edgePropsMap.get(src).equals(edgeProps)) {
-                    // Entry already exists. Return here.
-                    return;
+                    // Entry already exists. No update.
+                    rv = false;
+                } else {
+                    rv = true;
                 }
             }
-            edgePropsMap.put(src, edgeProps);
-            edgeMap.put(container, edgePropsMap);
+            if (rv) {
+                edgePropsMap.put(src, edgeProps);
+                edgeMap.put(container, edgePropsMap);
+            }
             break;
         case REMOVED:
-            if (edgePropsMap != null) {
+            if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
                 edgePropsMap.remove(src);
                 if (edgePropsMap.isEmpty()) {
                     edgeMap.remove(container);
                 } else {
                     edgeMap.put(container, edgePropsMap);
                 }
+                rv = true;
             }
             break;
         default:
-            logger.debug("notifyEdge: invalid {} for Edge {} in container {}",
-                    type, edge, container);
+            logger.debug(
+                    "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
+                    new Object[] { type.getName(), edge, container });
+        }
+
+        if (rv) {
+            logger.debug(
+                    "notifyLocalEdgeMap: {} for Edge {} in container {}",
+                    new Object[] { type.getName(), edge, container });
+        }
+        
+        return rv;
+    }
+
+    private void notifyEdge(String container, Edge edge, UpdateType type,
+            Set<Property> props) {
+        boolean notifyListeners;
+        
+        // Update local cache
+        notifyListeners = updateLocalEdgeMap(container, edge, type, props);
+
+        // Prepare to update TopologyService
+        if (notifyListeners) {
+            notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
+                    type)));
+            logger.debug("notifyEdge: {} Edge {} in container {}", 
+                    new Object[] { type.getName(), edge, container });
+        }
+    }
+
+    private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
+        if (etuList == null) {
             return;
         }
 
-        notifyQ.add(new NotifyEntry(container, edgeProps, type));
+        Edge edge;
+        UpdateType type;
+        List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
+        boolean notifyListeners = false, rv;
+
+        for (TopoEdgeUpdate etu : etuList) {
+            edge = etu.getEdge();
+            type = etu.getUpdateType();
+
+            // Update local cache
+            rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
+            if (rv) {
+                if (!notifyListeners) {
+                    notifyListeners = true;
+                }
+                etuNotifyList.add(etu);
+                logger.debug(
+                        "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
+                        new Object[] { type.getName(), edge, container });
+            }
+        }
 
-        logger.debug("notifyEdge: {} Edge {} in container {}",
-                type, edge, container);
+        // Prepare to update TopologyService
+        if (notifyListeners) {
+            notifyQ.add(new NotifyEntry(container, etuNotifyList));
+            logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
+        }
     }
 
     @Override
@@ -572,9 +682,12 @@ public class TopologyServiceShim implements IDiscoveryService,
             }
 
             long bw = 0;
-            for (Property prop : edgeProps.getRight()) {
-                if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
-                    bw = ((Bandwidth) prop).getValue();
+            Set<Property> props = edgeProps.getRight();
+            if (props != null) {
+                for (Property prop : props) {
+                    if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
+                        bw = ((Bandwidth) prop).getValue();
+                    }
                 }
             }
             count++;
@@ -638,14 +751,18 @@ public class TopologyServiceShim implements IDiscoveryService,
             return;
         }
         int i = 0;
+        List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
             if (edgeProps != null) {
                 i++;
+                teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
+                        .getRight(), UpdateType.ADDED));
                 logger.trace("Add edge {}", edgeProps.getLeft());
-                topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
-                        UpdateType.ADDED, edgeProps.getRight());
             }
         }
+        if (i > 0) {
+            topologServiceShimListener.edgeUpdate(teuList);
+        }
         logger.debug("Sent {} updates", i);
     }
 
@@ -663,7 +780,7 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (conList != null) {
             containers.addAll(conList);
         }
-        
+
         switch (type) {
         case ADDED:
             break;