As part of performance enhancements, Topology updates are now batched as much as... 95/295/1
authorJason Ye <yisye@cisco.com>
Fri, 3 May 2013 22:51:45 +0000 (15:51 -0700)
committerJason Ye <yisye@cisco.com>
Fri, 3 May 2013 23:50:48 +0000 (16:50 -0700)
Signed-off-by: Jason Ye <yisye@cisco.com>
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/ITopologyServiceShimListener.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServices.java

index 0c30c80f0df48fe4a5ca35fee756cdd7f698a464..e90823726c926d70304666055445b06df7ee4558 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -9,11 +8,13 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow;
 
+import java.util.List;
 import java.util.Set;
 
 import org.opendaylight.controller.sal.core.Edge;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 
 /**
  * The Interface provides Edge updates to the topology listeners
@@ -21,18 +22,17 @@ import org.opendaylight.controller.sal.core.UpdateType;
 public interface ITopologyServiceShimListener {
     /**
      * Called to update on Edge in the topology graph
-     *
-     * @param edge                     {@link org.opendaylight.controller.sal.core.Edge} being updated
-     * @param type             {@link org.opendaylight.controller.sal.core.UpdateType}
-     * @param props            set of {@link org.opendaylight.controller.sal.core.Property} like
-     *                                                 {@link org.opendaylight.controller.sal.core.Bandwidth} and/or
-     *                                                 {@link org.opendaylight.controller.sal.core.Latency} etc.
+     * 
+     * @param topoedgeupdateList
+     *            List of topoedgeupdates Each topoedgeupdate includes edge, its
+     *            Properties ( BandWidth and/or Latency etc) and update type.
      */
-    public void edgeUpdate(Edge edge, UpdateType type, Set<Property> props);
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList);
 
     /**
-     * Called when an Edge utilization is above the safe threshold configured
-     * on the controller
+     * Called when an Edge utilization is above the safe threshold configured on
+     * the controller
+     * 
      * @param {@link org.opendaylight.controller.sal.core.Edge}
      */
     public void edgeOverUtilized(Edge edge);
@@ -40,7 +40,7 @@ public interface ITopologyServiceShimListener {
     /**
      * Called when the Edge utilization is back to normal, below the safety
      * threshold level configured on the controller
-     *
+     * 
      * @param {@link org.opendaylight.controller.sal.core.Edge}
      */
     public void edgeUtilBackToNormal(Edge edge);
index 5c51cc5862d5f4c391cd956c1c8b59e1f03059ce..9a05c3f4f100c46b2c357f59ae6256f4e497f491 100644 (file)
@@ -207,7 +207,6 @@ public class SwitchHandler implements ISwitch {
         }
         executor.shutdown();
 
-        selector = null;
         msgReadWriteService = null;
 
         if (switchHandlerThread != null) {
@@ -313,7 +312,9 @@ public class SwitchHandler implements ISwitch {
         List<OFMessage> msgs = null;
 
         try {
-            msgs = msgReadWriteService.readMessages();
+            if (msgReadWriteService != null) {
+                msgs = msgReadWriteService.readMessages();
+            }
         } catch (Exception e) {
             reportError(e);
         }
index af502a2177d12c89101ca5b2363f90a9eeabe4da..873b53ff65aa23ececb17d733d64fb438f3f6098 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +45,7 @@ import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.State;
 import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.discovery.IDiscoveryService;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
 
 /**
@@ -52,8 +54,8 @@ import org.opendaylight.controller.sal.utils.GlobalConstants;
  * container configurations.
  */
 public class TopologyServiceShim implements IDiscoveryService,
-               IContainerListener, CommandProvider, IRefreshInternalProvider,
-               IInventoryShimExternalListener {
+        IContainerListener, CommandProvider, IRefreshInternalProvider,
+        IInventoryShimExternalListener {
     protected static final Logger logger = LoggerFactory
             .getLogger(TopologyServiceShim.class);
     private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
@@ -76,19 +78,31 @@ public class TopologyServiceShim implements IDiscoveryService,
 
     class NotifyEntry {
         String container;
-        Pair<Edge, Set<Property>> edgeProps;
-        UpdateType type;
+        List<TopoEdgeUpdate> teuList;
 
-        NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
-                UpdateType type) {
+        public NotifyEntry(String container, TopoEdgeUpdate teu) {
             this.container = container;
-            this.edgeProps = edgeProps;
-            this.type = type;
+            this.teuList = new ArrayList<TopoEdgeUpdate>();
+            if (teu != null) {
+                this.teuList.add(teu);
+            }
+        }
+
+        public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
+            this.container = container;
+            this.teuList = new ArrayList<TopoEdgeUpdate>();
+            if (teuList != null) {
+                this.teuList.addAll(teuList);
+            }
         }
     }
 
     class TopologyNotify implements Runnable {
         private final BlockingQueue<NotifyEntry> notifyQ;
+        private NotifyEntry entry;
+        private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
+        private List<TopoEdgeUpdate> teuList;
+        private boolean notifyListeners;
 
         TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
             this.notifyQ = notifyQ;
@@ -97,22 +111,37 @@ public class TopologyServiceShim implements IDiscoveryService,
         public void run() {
             while (true) {
                 try {
-                    NotifyEntry entry = notifyQ.take();
-
-                    ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
-                            .get(entry.container);
-                    topologServiceShimListener.edgeUpdate(
-                            entry.edgeProps.getLeft(), entry.type,
-                            entry.edgeProps.getRight());
+                    teuMap.clear();
+                    notifyListeners = false;
+                    while (!notifyQ.isEmpty()) {
+                        entry = notifyQ.take();
+                        teuList = teuMap.get(entry.container);
+                        if (teuList == null) {
+                            teuList = new ArrayList<TopoEdgeUpdate>();
+                        }
+                        // group all the updates together
+                        teuList.addAll(entry.teuList);
+                        teuMap.put(entry.container, teuList);
+                        notifyListeners = true;
+                    }
+                    
+                    if (notifyListeners) {
+                        for (String container : teuMap.keySet()) {
+                            // notify the listener
+                            topologyServiceShimListeners.get(container)
+                                    .edgeUpdate(teuMap.get(container));
+                        }
+                    }
 
-                    entry = null;
+                    Thread.sleep(100);
                 } catch (InterruptedException e1) {
-                    logger.warn("TopologyNotify interrupted {}", e1.getMessage());
+                    logger.warn("TopologyNotify interrupted {}",
+                            e1.getMessage());
                     if (shuttingDown) {
                         return;
                     }
                 } catch (Exception e2) {
-                    logger.error("",e2);
+                    logger.error("", e2);
                 }
             }
         }
@@ -165,7 +194,7 @@ public class TopologyServiceShim implements IDiscoveryService,
                         return;
                     }
                 } catch (Exception e2) {
-                    logger.error("",e2);
+                    logger.error("", e2);
                 }
             }
         }
@@ -353,7 +382,8 @@ public class TopologyServiceShim implements IDiscoveryService,
                 && this.topologyServiceShimListeners.get(containerName).equals(
                         s)) {
             this.topologyServiceShimListeners.remove(containerName);
-            logger.trace("Removed topologyServiceShimListener for container: {}",
+            logger.trace(
+                    "Removed topologyServiceShimListener for container: {}",
                     containerName);
         }
     }
@@ -370,6 +400,7 @@ public class TopologyServiceShim implements IDiscoveryService,
 
     private void removeNodeConnector(String container,
             NodeConnector nodeConnector) {
+        List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
                 .get(container);
         if (edgePropsMap == null) {
@@ -381,7 +412,8 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (edgeProps == null) {
             return;
         }
-        notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+        teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+                UpdateType.REMOVED));
 
         // Remove edge in another direction
         edgeProps = edgePropsMap
@@ -389,52 +421,130 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (edgeProps == null) {
             return;
         }
-        notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+        teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+                UpdateType.REMOVED));
+
+        // Update in one shot
+        notifyEdge(container, teuList);
     }
 
-    private void notifyEdge(String container, Edge edge, UpdateType type,
-            Set<Property> props) {
+    /**
+     * Update local cache and return true if it needs to notify upper layer
+     * Topology listeners.
+     * 
+     * @param container
+     *            The network container
+     * @param edge
+     *            The edge
+     * @param type
+     *            The update type
+     * @param props
+     *            The edge properties
+     * @return true if it needs to notify upper layer Topology listeners
+     */
+    private boolean updateLocalEdgeMap(String container, Edge edge,
+            UpdateType type, Set<Property> props) {
         ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
                 .get(container);
         NodeConnector src = edge.getTailNodeConnector();
         Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
                 edge, props);
+        boolean rv = false;
 
         switch (type) {
         case ADDED:
         case CHANGED:
             if (edgePropsMap == null) {
                 edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
+                rv = true;
             } else {
                 if (edgePropsMap.containsKey(src)
                         && edgePropsMap.get(src).equals(edgeProps)) {
-                    // Entry already exists. Return here.
-                    return;
+                    // Entry already exists. No update.
+                    rv = false;
+                } else {
+                    rv = true;
                 }
             }
-            edgePropsMap.put(src, edgeProps);
-            edgeMap.put(container, edgePropsMap);
+            if (rv) {
+                edgePropsMap.put(src, edgeProps);
+                edgeMap.put(container, edgePropsMap);
+            }
             break;
         case REMOVED:
-            if (edgePropsMap != null) {
+            if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
                 edgePropsMap.remove(src);
                 if (edgePropsMap.isEmpty()) {
                     edgeMap.remove(container);
                 } else {
                     edgeMap.put(container, edgePropsMap);
                 }
+                rv = true;
             }
             break;
         default:
-            logger.debug("notifyEdge: invalid {} for Edge {} in container {}",
-                    type, edge, container);
+            logger.debug(
+                    "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
+                    new Object[] { type.getName(), edge, container });
+        }
+
+        if (rv) {
+            logger.debug(
+                    "notifyLocalEdgeMap: {} for Edge {} in container {}",
+                    new Object[] { type.getName(), edge, container });
+        }
+        
+        return rv;
+    }
+
+    private void notifyEdge(String container, Edge edge, UpdateType type,
+            Set<Property> props) {
+        boolean notifyListeners;
+        
+        // Update local cache
+        notifyListeners = updateLocalEdgeMap(container, edge, type, props);
+
+        // Prepare to update TopologyService
+        if (notifyListeners) {
+            notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
+                    type)));
+            logger.debug("notifyEdge: {} Edge {} in container {}", 
+                    new Object[] { type.getName(), edge, container });
+        }
+    }
+
+    private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
+        if (etuList == null) {
             return;
         }
 
-        notifyQ.add(new NotifyEntry(container, edgeProps, type));
+        Edge edge;
+        UpdateType type;
+        List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
+        boolean notifyListeners = false, rv;
+
+        for (TopoEdgeUpdate etu : etuList) {
+            edge = etu.getEdge();
+            type = etu.getUpdateType();
+
+            // Update local cache
+            rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
+            if (rv) {
+                if (!notifyListeners) {
+                    notifyListeners = true;
+                }
+                etuNotifyList.add(etu);
+                logger.debug(
+                        "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
+                        new Object[] { type.getName(), edge, container });
+            }
+        }
 
-        logger.debug("notifyEdge: {} Edge {} in container {}",
-                type, edge, container);
+        // Prepare to update TopologyService
+        if (notifyListeners) {
+            notifyQ.add(new NotifyEntry(container, etuNotifyList));
+            logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
+        }
     }
 
     @Override
@@ -572,9 +682,12 @@ public class TopologyServiceShim implements IDiscoveryService,
             }
 
             long bw = 0;
-            for (Property prop : edgeProps.getRight()) {
-                if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
-                    bw = ((Bandwidth) prop).getValue();
+            Set<Property> props = edgeProps.getRight();
+            if (props != null) {
+                for (Property prop : props) {
+                    if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
+                        bw = ((Bandwidth) prop).getValue();
+                    }
                 }
             }
             count++;
@@ -638,14 +751,18 @@ public class TopologyServiceShim implements IDiscoveryService,
             return;
         }
         int i = 0;
+        List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
         for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
             if (edgeProps != null) {
                 i++;
+                teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
+                        .getRight(), UpdateType.ADDED));
                 logger.trace("Add edge {}", edgeProps.getLeft());
-                topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
-                        UpdateType.ADDED, edgeProps.getRight());
             }
         }
+        if (i > 0) {
+            topologServiceShimListener.edgeUpdate(teuList);
+        }
         logger.debug("Sent {} updates", i);
     }
 
@@ -663,7 +780,7 @@ public class TopologyServiceShim implements IDiscoveryService,
         if (conList != null) {
             containers.addAll(conList);
         }
-        
+
         switch (type) {
         case ADDED:
             break;
index c0b296345e261e18c6a0d0e3c58816183c734331..b23737c52037058d246d18bad2acd32caf52abc6 100644 (file)
@@ -10,9 +10,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.Dictionary;
 import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-
 import org.apache.felix.dm.Component;
 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
@@ -20,8 +17,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.opendaylight.controller.sal.core.Edge;
-import org.opendaylight.controller.sal.core.Property;
-import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.topology.IPluginInTopologyService;
 import org.opendaylight.controller.sal.topology.IPluginOutTopologyService;
 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
@@ -130,11 +125,8 @@ public class TopologyServices implements ITopologyServiceShimListener,
     }
 
     @Override
-    public void edgeUpdate(Edge edge, UpdateType type, Set<Property> props) {
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
         if (this.salTopoService != null) {
-            List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
-            TopoEdgeUpdate teu = new TopoEdgeUpdate(edge, props, type);
-            topoedgeupdateList.add(teu);
             this.salTopoService.edgeUpdate(topoedgeupdateList);
         }
     }