From de5d869a80fe14b0fa3e96f0cd7e6dccac8b7f7d Mon Sep 17 00:00:00 2001 From: Jason Ye Date: Fri, 3 May 2013 15:51:45 -0700 Subject: [PATCH] As part of performance enhancements, Topology updates are now batched as much as possible. This will effectively reduce the number of Routing updates to the listeners. Signed-off-by: Jason Ye --- .../ITopologyServiceShimListener.java | 22 +- .../openflow/core/internal/SwitchHandler.java | 5 +- .../internal/TopologyServiceShim.java | 197 ++++++++++++++---- .../openflow/internal/TopologyServices.java | 10 +- 4 files changed, 172 insertions(+), 62 deletions(-) diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/ITopologyServiceShimListener.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/ITopologyServiceShimListener.java index 0c30c80f0d..e90823726c 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/ITopologyServiceShimListener.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/ITopologyServiceShimListener.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -9,11 +8,13 @@ package org.opendaylight.controller.protocol_plugin.openflow; +import java.util.List; import java.util.Set; import org.opendaylight.controller.sal.core.Edge; import org.opendaylight.controller.sal.core.Property; import org.opendaylight.controller.sal.core.UpdateType; +import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; /** * The Interface provides Edge updates to the topology listeners @@ -21,18 +22,17 @@ import org.opendaylight.controller.sal.core.UpdateType; public interface ITopologyServiceShimListener { /** * Called to update on Edge in the topology graph - * - * @param edge {@link org.opendaylight.controller.sal.core.Edge} being updated - * @param type {@link org.opendaylight.controller.sal.core.UpdateType} - * @param props set of {@link org.opendaylight.controller.sal.core.Property} like - * {@link org.opendaylight.controller.sal.core.Bandwidth} and/or - * {@link org.opendaylight.controller.sal.core.Latency} etc. + * + * @param topoedgeupdateList + * List of topoedgeupdates Each topoedgeupdate includes edge, its + * Properties ( BandWidth and/or Latency etc) and update type. */ - public void edgeUpdate(Edge edge, UpdateType type, Set props); + public void edgeUpdate(List topoedgeupdateList); /** - * Called when an Edge utilization is above the safe threshold configured - * on the controller + * Called when an Edge utilization is above the safe threshold configured on + * the controller + * * @param {@link org.opendaylight.controller.sal.core.Edge} */ public void edgeOverUtilized(Edge edge); @@ -40,7 +40,7 @@ public interface ITopologyServiceShimListener { /** * Called when the Edge utilization is back to normal, below the safety * threshold level configured on the controller - * + * * @param {@link org.opendaylight.controller.sal.core.Edge} */ public void edgeUtilBackToNormal(Edge edge); diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 5c51cc5862..9a05c3f4f1 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -207,7 +207,6 @@ public class SwitchHandler implements ISwitch { } executor.shutdown(); - selector = null; msgReadWriteService = null; if (switchHandlerThread != null) { @@ -313,7 +312,9 @@ public class SwitchHandler implements ISwitch { List msgs = null; try { - msgs = msgReadWriteService.readMessages(); + if (msgReadWriteService != null) { + msgs = msgReadWriteService.readMessages(); + } } catch (Exception e) { reportError(e); } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java index af502a2177..873b53ff65 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,6 +45,7 @@ import org.opendaylight.controller.sal.core.Property; import org.opendaylight.controller.sal.core.State; import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.discovery.IDiscoveryService; +import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; import org.opendaylight.controller.sal.utils.GlobalConstants; /** @@ -52,8 +54,8 @@ import org.opendaylight.controller.sal.utils.GlobalConstants; * container configurations. */ public class TopologyServiceShim implements IDiscoveryService, - IContainerListener, CommandProvider, IRefreshInternalProvider, - IInventoryShimExternalListener { + IContainerListener, CommandProvider, IRefreshInternalProvider, + IInventoryShimExternalListener { protected static final Logger logger = LoggerFactory .getLogger(TopologyServiceShim.class); private ConcurrentMap topologyServiceShimListeners = new ConcurrentHashMap(); @@ -76,19 +78,31 @@ public class TopologyServiceShim implements IDiscoveryService, class NotifyEntry { String container; - Pair> edgeProps; - UpdateType type; + List teuList; - NotifyEntry(String container, Pair> edgeProps, - UpdateType type) { + public NotifyEntry(String container, TopoEdgeUpdate teu) { this.container = container; - this.edgeProps = edgeProps; - this.type = type; + this.teuList = new ArrayList(); + if (teu != null) { + this.teuList.add(teu); + } + } + + public NotifyEntry(String container, List teuList) { + this.container = container; + this.teuList = new ArrayList(); + if (teuList != null) { + this.teuList.addAll(teuList); + } } } class TopologyNotify implements Runnable { private final BlockingQueue notifyQ; + private NotifyEntry entry; + private Map> teuMap = new HashMap>(); + private List teuList; + private boolean notifyListeners; TopologyNotify(BlockingQueue notifyQ) { this.notifyQ = notifyQ; @@ -97,22 +111,37 @@ public class TopologyServiceShim implements IDiscoveryService, public void run() { while (true) { try { - NotifyEntry entry = notifyQ.take(); - - ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners - .get(entry.container); - topologServiceShimListener.edgeUpdate( - entry.edgeProps.getLeft(), entry.type, - entry.edgeProps.getRight()); + teuMap.clear(); + notifyListeners = false; + while (!notifyQ.isEmpty()) { + entry = notifyQ.take(); + teuList = teuMap.get(entry.container); + if (teuList == null) { + teuList = new ArrayList(); + } + // group all the updates together + teuList.addAll(entry.teuList); + teuMap.put(entry.container, teuList); + notifyListeners = true; + } + + if (notifyListeners) { + for (String container : teuMap.keySet()) { + // notify the listener + topologyServiceShimListeners.get(container) + .edgeUpdate(teuMap.get(container)); + } + } - entry = null; + Thread.sleep(100); } catch (InterruptedException e1) { - logger.warn("TopologyNotify interrupted {}", e1.getMessage()); + logger.warn("TopologyNotify interrupted {}", + e1.getMessage()); if (shuttingDown) { return; } } catch (Exception e2) { - logger.error("",e2); + logger.error("", e2); } } } @@ -165,7 +194,7 @@ public class TopologyServiceShim implements IDiscoveryService, return; } } catch (Exception e2) { - logger.error("",e2); + logger.error("", e2); } } } @@ -353,7 +382,8 @@ public class TopologyServiceShim implements IDiscoveryService, && this.topologyServiceShimListeners.get(containerName).equals( s)) { this.topologyServiceShimListeners.remove(containerName); - logger.trace("Removed topologyServiceShimListener for container: {}", + logger.trace( + "Removed topologyServiceShimListener for container: {}", containerName); } } @@ -370,6 +400,7 @@ public class TopologyServiceShim implements IDiscoveryService, private void removeNodeConnector(String container, NodeConnector nodeConnector) { + List teuList = new ArrayList(); Map>> edgePropsMap = edgeMap .get(container); if (edgePropsMap == null) { @@ -381,7 +412,8 @@ public class TopologyServiceShim implements IDiscoveryService, if (edgeProps == null) { return; } - notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null); + teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null, + UpdateType.REMOVED)); // Remove edge in another direction edgeProps = edgePropsMap @@ -389,52 +421,130 @@ public class TopologyServiceShim implements IDiscoveryService, if (edgeProps == null) { return; } - notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null); + teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null, + UpdateType.REMOVED)); + + // Update in one shot + notifyEdge(container, teuList); } - private void notifyEdge(String container, Edge edge, UpdateType type, - Set props) { + /** + * Update local cache and return true if it needs to notify upper layer + * Topology listeners. + * + * @param container + * The network container + * @param edge + * The edge + * @param type + * The update type + * @param props + * The edge properties + * @return true if it needs to notify upper layer Topology listeners + */ + private boolean updateLocalEdgeMap(String container, Edge edge, + UpdateType type, Set props) { ConcurrentMap>> edgePropsMap = edgeMap .get(container); NodeConnector src = edge.getTailNodeConnector(); Pair> edgeProps = new ImmutablePair>( edge, props); + boolean rv = false; switch (type) { case ADDED: case CHANGED: if (edgePropsMap == null) { edgePropsMap = new ConcurrentHashMap>>(); + rv = true; } else { if (edgePropsMap.containsKey(src) && edgePropsMap.get(src).equals(edgeProps)) { - // Entry already exists. Return here. - return; + // Entry already exists. No update. + rv = false; + } else { + rv = true; } } - edgePropsMap.put(src, edgeProps); - edgeMap.put(container, edgePropsMap); + if (rv) { + edgePropsMap.put(src, edgeProps); + edgeMap.put(container, edgePropsMap); + } break; case REMOVED: - if (edgePropsMap != null) { + if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) { edgePropsMap.remove(src); if (edgePropsMap.isEmpty()) { edgeMap.remove(container); } else { edgeMap.put(container, edgePropsMap); } + rv = true; } break; default: - logger.debug("notifyEdge: invalid {} for Edge {} in container {}", - type, edge, container); + logger.debug( + "notifyLocalEdgeMap: invalid {} for Edge {} in container {}", + new Object[] { type.getName(), edge, container }); + } + + if (rv) { + logger.debug( + "notifyLocalEdgeMap: {} for Edge {} in container {}", + new Object[] { type.getName(), edge, container }); + } + + return rv; + } + + private void notifyEdge(String container, Edge edge, UpdateType type, + Set props) { + boolean notifyListeners; + + // Update local cache + notifyListeners = updateLocalEdgeMap(container, edge, type, props); + + // Prepare to update TopologyService + if (notifyListeners) { + notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props, + type))); + logger.debug("notifyEdge: {} Edge {} in container {}", + new Object[] { type.getName(), edge, container }); + } + } + + private void notifyEdge(String container, List etuList) { + if (etuList == null) { return; } - notifyQ.add(new NotifyEntry(container, edgeProps, type)); + Edge edge; + UpdateType type; + List etuNotifyList = new ArrayList(); + boolean notifyListeners = false, rv; + + for (TopoEdgeUpdate etu : etuList) { + edge = etu.getEdge(); + type = etu.getUpdateType(); + + // Update local cache + rv = updateLocalEdgeMap(container, edge, type, etu.getProperty()); + if (rv) { + if (!notifyListeners) { + notifyListeners = true; + } + etuNotifyList.add(etu); + logger.debug( + "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}", + new Object[] { type.getName(), edge, container }); + } + } - logger.debug("notifyEdge: {} Edge {} in container {}", - type, edge, container); + // Prepare to update TopologyService + if (notifyListeners) { + notifyQ.add(new NotifyEntry(container, etuNotifyList)); + logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ"); + } } @Override @@ -572,9 +682,12 @@ public class TopologyServiceShim implements IDiscoveryService, } long bw = 0; - for (Property prop : edgeProps.getRight()) { - if (prop.getName().equals(Bandwidth.BandwidthPropName)) { - bw = ((Bandwidth) prop).getValue(); + Set props = edgeProps.getRight(); + if (props != null) { + for (Property prop : props) { + if (prop.getName().equals(Bandwidth.BandwidthPropName)) { + bw = ((Bandwidth) prop).getValue(); + } } } count++; @@ -638,14 +751,18 @@ public class TopologyServiceShim implements IDiscoveryService, return; } int i = 0; + List teuList = new ArrayList(); for (Pair> edgeProps : edgePropMap.values()) { if (edgeProps != null) { i++; + teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps + .getRight(), UpdateType.ADDED)); logger.trace("Add edge {}", edgeProps.getLeft()); - topologServiceShimListener.edgeUpdate(edgeProps.getLeft(), - UpdateType.ADDED, edgeProps.getRight()); } } + if (i > 0) { + topologServiceShimListener.edgeUpdate(teuList); + } logger.debug("Sent {} updates", i); } @@ -663,7 +780,7 @@ public class TopologyServiceShim implements IDiscoveryService, if (conList != null) { containers.addAll(conList); } - + switch (type) { case ADDED: break; diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServices.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServices.java index c0b296345e..b23737c520 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServices.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServices.java @@ -10,9 +10,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal; import java.util.Dictionary; import java.util.List; -import java.util.Set; -import java.util.ArrayList; - import org.apache.felix.dm.Component; import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider; import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener; @@ -20,8 +17,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.opendaylight.controller.sal.core.Edge; -import org.opendaylight.controller.sal.core.Property; -import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.topology.IPluginInTopologyService; import org.opendaylight.controller.sal.topology.IPluginOutTopologyService; import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; @@ -130,11 +125,8 @@ public class TopologyServices implements ITopologyServiceShimListener, } @Override - public void edgeUpdate(Edge edge, UpdateType type, Set props) { + public void edgeUpdate(List topoedgeupdateList) { if (this.salTopoService != null) { - List topoedgeupdateList = new ArrayList(); - TopoEdgeUpdate teu = new TopoEdgeUpdate(edge, props, type); - topoedgeupdateList.add(teu); this.salTopoService.edgeUpdate(topoedgeupdateList); } } -- 2.36.6