package org.opendaylight.controller.protocol_plugin.openflow.internal;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.sal.core.State;
import org.opendaylight.controller.sal.core.UpdateType;
import org.opendaylight.controller.sal.discovery.IDiscoveryService;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
import org.opendaylight.controller.sal.utils.GlobalConstants;
/**
* container configurations.
*/
public class TopologyServiceShim implements IDiscoveryService,
- IContainerListener, CommandProvider, IRefreshInternalProvider,
- IInventoryShimExternalListener {
+ IContainerListener, CommandProvider, IRefreshInternalProvider,
+ IInventoryShimExternalListener {
protected static final Logger logger = LoggerFactory
.getLogger(TopologyServiceShim.class);
private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
class NotifyEntry {
String container;
- Pair<Edge, Set<Property>> edgeProps;
- UpdateType type;
+ List<TopoEdgeUpdate> teuList;
- NotifyEntry(String container, Pair<Edge, Set<Property>> edgeProps,
- UpdateType type) {
+ public NotifyEntry(String container, TopoEdgeUpdate teu) {
this.container = container;
- this.edgeProps = edgeProps;
- this.type = type;
+ this.teuList = new ArrayList<TopoEdgeUpdate>();
+ if (teu != null) {
+ this.teuList.add(teu);
+ }
+ }
+
+ public NotifyEntry(String container, List<TopoEdgeUpdate> teuList) {
+ this.container = container;
+ this.teuList = new ArrayList<TopoEdgeUpdate>();
+ if (teuList != null) {
+ this.teuList.addAll(teuList);
+ }
}
}
class TopologyNotify implements Runnable {
private final BlockingQueue<NotifyEntry> notifyQ;
+ private NotifyEntry entry;
+ private Map<String, List<TopoEdgeUpdate>> teuMap = new HashMap<String, List<TopoEdgeUpdate>>();
+ private List<TopoEdgeUpdate> teuList;
+ private boolean notifyListeners;
TopologyNotify(BlockingQueue<NotifyEntry> notifyQ) {
this.notifyQ = notifyQ;
public void run() {
while (true) {
try {
- NotifyEntry entry = notifyQ.take();
-
- ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
- .get(entry.container);
- topologServiceShimListener.edgeUpdate(
- entry.edgeProps.getLeft(), entry.type,
- entry.edgeProps.getRight());
+ teuMap.clear();
+ notifyListeners = false;
+ while (!notifyQ.isEmpty()) {
+ entry = notifyQ.take();
+ teuList = teuMap.get(entry.container);
+ if (teuList == null) {
+ teuList = new ArrayList<TopoEdgeUpdate>();
+ }
+ // group all the updates together
+ teuList.addAll(entry.teuList);
+ teuMap.put(entry.container, teuList);
+ notifyListeners = true;
+ }
+
+ if (notifyListeners) {
+ for (String container : teuMap.keySet()) {
+ // notify the listener
+ topologyServiceShimListeners.get(container)
+ .edgeUpdate(teuMap.get(container));
+ }
+ }
- entry = null;
+ Thread.sleep(100);
} catch (InterruptedException e1) {
- logger.warn("TopologyNotify interrupted {}", e1.getMessage());
+ logger.warn("TopologyNotify interrupted {}",
+ e1.getMessage());
if (shuttingDown) {
return;
}
} catch (Exception e2) {
- logger.error("",e2);
+ logger.error("", e2);
}
}
}
return;
}
} catch (Exception e2) {
- logger.error("",e2);
+ logger.error("", e2);
}
}
}
&& this.topologyServiceShimListeners.get(containerName).equals(
s)) {
this.topologyServiceShimListeners.remove(containerName);
- logger.trace("Removed topologyServiceShimListener for container: {}",
+ logger.trace(
+ "Removed topologyServiceShimListener for container: {}",
containerName);
}
}
private void removeNodeConnector(String container,
NodeConnector nodeConnector) {
+ List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
if (edgePropsMap == null) {
if (edgeProps == null) {
return;
}
- notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+ teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+ UpdateType.REMOVED));
// Remove edge in another direction
edgeProps = edgePropsMap
if (edgeProps == null) {
return;
}
- notifyEdge(container, edgeProps.getLeft(), UpdateType.REMOVED, null);
+ teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), null,
+ UpdateType.REMOVED));
+
+ // Update in one shot
+ notifyEdge(container, teuList);
}
- private void notifyEdge(String container, Edge edge, UpdateType type,
- Set<Property> props) {
+ /**
+ * Update local cache and return true if it needs to notify upper layer
+ * Topology listeners.
+ *
+ * @param container
+ * The network container
+ * @param edge
+ * The edge
+ * @param type
+ * The update type
+ * @param props
+ * The edge properties
+ * @return true if it needs to notify upper layer Topology listeners
+ */
+ private boolean updateLocalEdgeMap(String container, Edge edge,
+ UpdateType type, Set<Property> props) {
ConcurrentMap<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
NodeConnector src = edge.getTailNodeConnector();
Pair<Edge, Set<Property>> edgeProps = new ImmutablePair<Edge, Set<Property>>(
edge, props);
+ boolean rv = false;
switch (type) {
case ADDED:
case CHANGED:
if (edgePropsMap == null) {
edgePropsMap = new ConcurrentHashMap<NodeConnector, Pair<Edge, Set<Property>>>();
+ rv = true;
} else {
if (edgePropsMap.containsKey(src)
&& edgePropsMap.get(src).equals(edgeProps)) {
- // Entry already exists. Return here.
- return;
+ // Entry already exists. No update.
+ rv = false;
+ } else {
+ rv = true;
}
}
- edgePropsMap.put(src, edgeProps);
- edgeMap.put(container, edgePropsMap);
+ if (rv) {
+ edgePropsMap.put(src, edgeProps);
+ edgeMap.put(container, edgePropsMap);
+ }
break;
case REMOVED:
- if (edgePropsMap != null) {
+ if ((edgePropsMap != null) && edgePropsMap.containsKey(src)) {
edgePropsMap.remove(src);
if (edgePropsMap.isEmpty()) {
edgeMap.remove(container);
} else {
edgeMap.put(container, edgePropsMap);
}
+ rv = true;
}
break;
default:
- logger.debug("notifyEdge: invalid {} for Edge {} in container {}",
- type, edge, container);
+ logger.debug(
+ "notifyLocalEdgeMap: invalid {} for Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+
+ if (rv) {
+ logger.debug(
+ "notifyLocalEdgeMap: {} for Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+
+ return rv;
+ }
+
+ private void notifyEdge(String container, Edge edge, UpdateType type,
+ Set<Property> props) {
+ boolean notifyListeners;
+
+ // Update local cache
+ notifyListeners = updateLocalEdgeMap(container, edge, type, props);
+
+ // Prepare to update TopologyService
+ if (notifyListeners) {
+ notifyQ.add(new NotifyEntry(container, new TopoEdgeUpdate(edge, props,
+ type)));
+ logger.debug("notifyEdge: {} Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+ }
+
+ private void notifyEdge(String container, List<TopoEdgeUpdate> etuList) {
+ if (etuList == null) {
return;
}
- notifyQ.add(new NotifyEntry(container, edgeProps, type));
+ Edge edge;
+ UpdateType type;
+ List<TopoEdgeUpdate> etuNotifyList = new ArrayList<TopoEdgeUpdate>();
+ boolean notifyListeners = false, rv;
+
+ for (TopoEdgeUpdate etu : etuList) {
+ edge = etu.getEdge();
+ type = etu.getUpdateType();
+
+ // Update local cache
+ rv = updateLocalEdgeMap(container, edge, type, etu.getProperty());
+ if (rv) {
+ if (!notifyListeners) {
+ notifyListeners = true;
+ }
+ etuNotifyList.add(etu);
+ logger.debug(
+ "notifyEdge(TopoEdgeUpdate): {} Edge {} in container {}",
+ new Object[] { type.getName(), edge, container });
+ }
+ }
- logger.debug("notifyEdge: {} Edge {} in container {}",
- type, edge, container);
+ // Prepare to update TopologyService
+ if (notifyListeners) {
+ notifyQ.add(new NotifyEntry(container, etuNotifyList));
+ logger.debug("notifyEdge(TopoEdgeUpdate): add notifyQ");
+ }
}
@Override
}
long bw = 0;
- for (Property prop : edgeProps.getRight()) {
- if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
- bw = ((Bandwidth) prop).getValue();
+ Set<Property> props = edgeProps.getRight();
+ if (props != null) {
+ for (Property prop : props) {
+ if (prop.getName().equals(Bandwidth.BandwidthPropName)) {
+ bw = ((Bandwidth) prop).getValue();
+ }
}
}
count++;
return;
}
int i = 0;
+ List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
for (Pair<Edge, Set<Property>> edgeProps : edgePropMap.values()) {
if (edgeProps != null) {
i++;
+ teuList.add(new TopoEdgeUpdate(edgeProps.getLeft(), edgeProps
+ .getRight(), UpdateType.ADDED));
logger.trace("Add edge {}", edgeProps.getLeft());
- topologServiceShimListener.edgeUpdate(edgeProps.getLeft(),
- UpdateType.ADDED, edgeProps.getRight());
}
}
+ if (i > 0) {
+ topologServiceShimListener.edgeUpdate(teuList);
+ }
logger.debug("Sent {} updates", i);
}
if (conList != null) {
containers.addAll(conList);
}
-
+
switch (type) {
case ADDED:
break;