2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.topologymanager.internal;
11 import org.apache.commons.lang3.tuple.ImmutablePair;
12 import org.apache.felix.dm.Component;
13 import org.eclipse.osgi.framework.console.CommandInterpreter;
14 import org.eclipse.osgi.framework.console.CommandProvider;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.configuration.ConfigurationObject;
21 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
22 import org.opendaylight.controller.configuration.IConfigurationContainerService;
23 import org.opendaylight.controller.sal.core.Edge;
24 import org.opendaylight.controller.sal.core.Host;
25 import org.opendaylight.controller.sal.core.Node;
26 import org.opendaylight.controller.sal.core.NodeConnector;
27 import org.opendaylight.controller.sal.core.Property;
28 import org.opendaylight.controller.sal.core.TimeStamp;
29 import org.opendaylight.controller.sal.core.UpdateType;
30 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
31 import org.opendaylight.controller.sal.topology.ITopologyService;
32 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
33 import org.opendaylight.controller.sal.utils.IObjectReader;
34 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
35 import org.opendaylight.controller.sal.utils.Status;
36 import org.opendaylight.controller.sal.utils.StatusCode;
37 import org.opendaylight.controller.switchmanager.IInventoryListener;
38 import org.opendaylight.controller.switchmanager.ISwitchManager;
39 import org.opendaylight.controller.topologymanager.ITopologyManager;
40 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
41 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
42 import org.opendaylight.controller.topologymanager.ITopologyManagerShell;
43 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
44 import org.osgi.framework.BundleContext;
45 import org.osgi.framework.FrameworkUtil;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import java.io.FileNotFoundException;
50 import java.io.IOException;
51 import java.io.ObjectInputStream;
52 import java.util.ArrayList;
53 import java.util.Dictionary;
54 import java.util.EnumSet;
55 import java.util.HashMap;
56 import java.util.HashSet;
57 import java.util.Iterator;
58 import java.util.LinkedList;
59 import java.util.List;
62 import java.util.Timer;
63 import java.util.TimerTask;
64 import java.util.concurrent.BlockingQueue;
65 import java.util.concurrent.ConcurrentHashMap;
66 import java.util.concurrent.ConcurrentMap;
67 import java.util.concurrent.CopyOnWriteArraySet;
68 import java.util.concurrent.LinkedBlockingQueue;
71 * The class describes TopologyManager which is the central repository of the
72 * network topology. It provides service for applications to interact with
73 * topology database and notifies all the listeners of topology changes.
75 public class TopologyManagerImpl implements
76 ICacheUpdateAware<Object, Object>,
78 ITopologyManagerShell,
79 IConfigurationContainerAware,
84 protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
85 protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
86 protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
87 protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
88 private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
89 private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
90 private static final long PENDING_UPDATE_TIMEOUT = 5000L;
92 private ITopologyService topoService;
93 private IClusterContainerServices clusterContainerService;
94 private IConfigurationContainerService configurationService;
95 private ISwitchManager switchManager;
96 // DB of all the Edges with properties which constitute our topology
97 private ConcurrentMap<Edge, Set<Property>> edgesDB;
98 // DB of all NodeConnector which are part of ISL Edges, meaning they
99 // are connected to another NodeConnector on the other side of an ISL link.
100 // NodeConnector of a Production Edge is not part of this DB.
101 private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
102 // DB of all the NodeConnectors with an Host attached to it
103 private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
104 // Topology Manager Aware listeners
105 private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
106 // Topology Manager Aware listeners - for clusterwide updates
107 private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
108 new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
109 private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
110 private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
111 private volatile Boolean shuttingDown = false;
112 private Thread notifyThread;
113 private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
114 new HashMap<NodeConnector, List<PendingUpdateTask>>();
115 private final BlockingQueue<TopoEdgeUpdate> updateQ =
116 new LinkedBlockingQueue<TopoEdgeUpdate>();
117 private Timer pendingTimer;
118 private Thread updateThread;
120 private class PendingEdgeUpdate extends TopoEdgeUpdate {
121 private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
126 private class UpdateTopology implements Runnable {
129 log.trace("Start topology update thread");
131 while (!shuttingDown) {
133 List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
134 TopoEdgeUpdate teu = updateQ.take();
135 for (; teu != null; teu = updateQ.poll()) {
139 if (!list.isEmpty()) {
140 log.trace("Update edges: {}", list);
143 } catch (InterruptedException e) {
147 log.warn("Topology update thread interrupted", e);
148 } catch (Exception e) {
149 log.error("Exception on topology update thread", e);
153 log.trace("Exit topology update thread");
157 private class PendingUpdateTask extends TimerTask {
158 private final Edge edge;
159 private final Set<Property> props;
160 private final UpdateType type;
162 private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
168 private NodeConnector getHeadNodeConnector() {
169 return edge.getHeadNodeConnector();
172 private void flush() {
173 log.info("Flush pending topology update: edge {}, type {}",
175 updateQ.add(new PendingEdgeUpdate(edge, props, type));
180 if (removePendingEvent(this)) {
181 log.warn("Pending topology update timed out: edge{}, type {}",
187 void nonClusterObjectCreate() {
188 edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
189 hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
190 nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
191 userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
194 void setTopologyManagerAware(ITopologyManagerAware s) {
195 if (this.topologyManagerAware != null) {
196 log.debug("Adding ITopologyManagerAware: {}", s);
197 this.topologyManagerAware.add(s);
201 void unsetTopologyManagerAware(ITopologyManagerAware s) {
202 if (this.topologyManagerAware != null) {
203 log.debug("Removing ITopologyManagerAware: {}", s);
204 this.topologyManagerAware.remove(s);
208 void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
209 if (this.topologyManagerClusterWideAware != null) {
210 log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
211 this.topologyManagerClusterWideAware.add(s);
215 void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
216 if (this.topologyManagerClusterWideAware != null) {
217 log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
218 this.topologyManagerClusterWideAware.remove(s);
222 void setTopoService(ITopologyService s) {
223 log.debug("Adding ITopologyService: {}", s);
224 this.topoService = s;
227 void unsetTopoService(ITopologyService s) {
228 if (this.topoService == s) {
229 log.debug("Removing ITopologyService: {}", s);
230 this.topoService = null;
234 void setClusterContainerService(IClusterContainerServices s) {
235 log.debug("Cluster Service set");
236 this.clusterContainerService = s;
239 void unsetClusterContainerService(IClusterContainerServices s) {
240 if (this.clusterContainerService == s) {
241 log.debug("Cluster Service removed!");
242 this.clusterContainerService = null;
246 public void setConfigurationContainerService(IConfigurationContainerService service) {
247 log.trace("Got configuration service set request {}", service);
248 this.configurationService = service;
251 public void unsetConfigurationContainerService(IConfigurationContainerService service) {
252 log.trace("Got configuration service UNset request");
253 this.configurationService = null;
256 void setSwitchManager(ISwitchManager s) {
257 log.debug("Adding ISwitchManager: {}", s);
258 this.switchManager = s;
261 void unsetSwitchManager(ISwitchManager s) {
262 if (this.switchManager == s) {
263 log.debug("Removing ISwitchManager: {}", s);
264 this.switchManager = null;
269 * Function called by the dependency manager when all the required
270 * dependencies are satisfied
273 void init(Component c) {
276 String containerName = null;
277 Dictionary<?, ?> props = c.getServiceProperties();
279 containerName = (String) props.get("containerName");
281 // In the Global instance case the containerName is empty
282 containerName = "UNKNOWN";
285 registerWithOSGIConsole();
288 // Restore the shuttingDown status on init of the component
289 shuttingDown = false;
290 notifyThread = new Thread(new TopologyNotify(notifyQ));
291 pendingTimer = new Timer("Topology Pending Update Timer");
292 updateThread = new Thread(new UpdateTopology(), "Topology Update");
295 @SuppressWarnings({ "unchecked" })
296 private void allocateCaches() {
298 (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
301 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
303 this.nodeConnectorsDB =
304 (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
305 TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
307 (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
308 TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
311 private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
312 ConcurrentMap<?, ?> cache = null;
314 cache = this.clusterContainerService.createCache(cacheName, cacheModes);
315 } catch (CacheExistException e) {
316 log.debug(cacheName + " cache already exists - destroy and recreate if needed");
317 } catch (CacheConfigException e) {
318 log.error(cacheName + " cache configuration invalid - check cache mode");
323 @SuppressWarnings({ "unchecked" })
324 private void retrieveCaches() {
325 if (this.clusterContainerService == null) {
326 log.error("Cluster Services is null, can't retrieve caches.");
330 this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
331 if (edgesDB == null) {
332 log.error("Failed to get cache for " + TOPOEDGESDB);
336 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
337 if (hostsDB == null) {
338 log.error("Failed to get cache for " + TOPOHOSTSDB);
341 this.nodeConnectorsDB =
342 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
343 if (nodeConnectorsDB == null) {
344 log.error("Failed to get cache for " + TOPONODECONNECTORDB);
348 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
349 if (userLinksDB == null) {
350 log.error("Failed to get cache for " + TOPOUSERLINKSDB);
355 * Function called after the topology manager has registered the service in
356 * OSGi service registry.
360 updateThread.start();
362 // Start the batcher thread for the cluster wide topology updates
363 notifyThread.start();
364 // SollicitRefresh MUST be called here else if called at init
365 // time it may sollicit refresh too soon.
366 log.debug("Sollicit topology refresh");
367 topoService.sollicitRefresh();
372 updateThread.interrupt();
373 notifyThread.interrupt();
374 pendingTimer.cancel();
378 * Function called by the dependency manager when at least one dependency
379 * become unsatisfied or when the component is shutting down because for
380 * example bundle is being stopped.
391 private void loadConfiguration() {
392 for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) {
393 addUserLink((TopologyUserLinkConfig) conf);
398 public Status saveConfig() {
399 return saveConfigInternal();
402 public Status saveConfigInternal() {
403 Status saveStatus = configurationService.persistConfiguration(
404 new ArrayList<ConfigurationObject>(userLinksDB.values()), USER_LINKS_FILE_NAME);
406 if (!saveStatus.isSuccess()) {
407 return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
413 public Map<Node, Set<Edge>> getNodeEdges() {
414 if (this.edgesDB == null) {
418 Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
419 for (Edge edge : this.edgesDB.keySet()) {
420 // Lets analyze the tail
421 Node node = edge.getTailNodeConnector().getNode();
422 Set<Edge> nodeEdges = res.get(node);
423 if (nodeEdges == null) {
424 nodeEdges = new HashSet<Edge>();
425 res.put(node, nodeEdges);
429 // Lets analyze the head
430 node = edge.getHeadNodeConnector().getNode();
431 nodeEdges = res.get(node);
432 if (nodeEdges == null) {
433 nodeEdges = new HashSet<Edge>();
434 res.put(node, nodeEdges);
443 public boolean isInternal(NodeConnector p) {
444 if (this.nodeConnectorsDB == null) {
448 // This is an internal NodeConnector if is connected to
449 // another Node i.e it's part of the nodeConnectorsDB
450 return (this.nodeConnectorsDB.get(p) != null);
454 * This method returns true if the edge is an ISL link.
458 * @return true if it is an ISL link
460 public boolean isISLink(Edge e) {
461 return (!isProductionLink(e));
465 * This method returns true if the edge is a production link.
469 * @return true if it is a production link
471 public boolean isProductionLink(Edge e) {
472 return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
473 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
477 * This method cross checks the determination of nodeConnector type by Discovery Service
478 * against the information in SwitchManager and updates it accordingly.
482 private void crossCheckNodeConnectors(Edge e) {
484 if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
485 nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
487 e.setHeadNodeConnector(nc);
490 if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
491 nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
493 e.setTailNodeConnector(nc);
499 * A NodeConnector may have been categorized as of type Production by Discovery Service.
500 * But at the time when this determination was made, only OF nodes were known to Discovery
501 * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
502 * so, then it returns a new NodeConnector with correct type.
505 * NodeConnector as passed on in the edge
507 * If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
508 * with correct type, null otherwise
511 private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
513 for (Node node : switchManager.getNodes()) {
514 String nodeName = node.getNodeIDString();
515 log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
516 nc.getNode().getNodeIDString());
517 if (nodeName.equals(nc.getNode().getNodeIDString())) {
518 NodeConnector nodeConnector = NodeConnectorCreator
519 .createNodeConnector(node.getType(), nc.getID(), node);
520 return nodeConnector;
527 * The Map returned is a copy of the current topology hence if the topology
528 * changes the copy doesn't
530 * @return A Map representing the current topology expressed as edges of the
534 public Map<Edge, Set<Property>> getEdges() {
535 if (this.edgesDB == null) {
539 Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
541 for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
542 // Sets of props are copied because the composition of
543 // those properties could change with time
544 props = new HashSet<Property>(edgeEntry.getValue());
545 // We can simply reuse the key because the object is
546 // immutable so doesn't really matter that we are
547 // referencing the only owned by a different table, the
548 // meaning is the same because doesn't change with time.
549 edgeMap.put(edgeEntry.getKey(), props);
556 public Set<NodeConnector> getNodeConnectorWithHost() {
557 if (this.hostsDB == null) {
561 return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
565 public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
566 if (this.hostsDB == null) {
569 HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
571 Set<NodeConnector> portSet;
572 for (NodeConnector nc : this.hostsDB.keySet()) {
574 portSet = res.get(node);
575 if (portSet == null) {
576 // Create the HashSet if null
577 portSet = new HashSet<NodeConnector>();
578 res.put(node, portSet);
581 // Keep updating the HashSet, given this is not a
582 // clustered map we can just update the set without
583 // worrying to update the hashmap.
591 public Host getHostAttachedToNodeConnector(NodeConnector port) {
592 List<Host> hosts = getHostsAttachedToNodeConnector(port);
593 if(hosts != null && !hosts.isEmpty()){
600 public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
601 Set<ImmutablePair<Host, Set<Property>>> hosts;
602 if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
605 // create a list of hosts
606 List<Host> retHosts = new LinkedList<Host>();
607 for(ImmutablePair<Host, Set<Property>> host : hosts) {
608 retHosts.add(host.getLeft());
614 public synchronized void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
616 // Clone the property set in case non null else just
617 // create an empty one. Caches allocated via infinispan
618 // don't allow null values
620 props = new HashSet<Property>();
622 props = new HashSet<Property>(props);
624 ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
627 Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
628 if(hostSet == null) {
629 hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
634 hostSet.add(thisHost);
635 this.hostsDB.put(port, hostSet);
638 hostSet.remove(thisHost);
639 if(hostSet.isEmpty()) {
640 //remove only if hasn't been concurrently modified
641 this.hostsDB.remove(port, hostSet);
643 this.hostsDB.put(port, hostSet);
649 private boolean headNodeConnectorExist(Edge e) {
651 * Only check the head end point which is supposed to be part of a
652 * network node we control (present in our inventory). If we checked the
653 * tail end point as well, we would not store the edges that connect to
654 * a non sdn enable port on a non sdn capable production switch. We want
655 * to be able to see these switches on the topology.
657 NodeConnector head = e.getHeadNodeConnector();
658 return (switchManager.doesNodeConnectorExist(head));
661 private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
662 NodeConnector head = e.getHeadNodeConnector();
663 PendingUpdateTask task = new PendingUpdateTask(e, p, t);
664 synchronized (pendingUpdates) {
665 List<PendingUpdateTask> list = pendingUpdates.get(head);
667 list = new LinkedList<PendingUpdateTask>();
668 pendingUpdates.put(head, list);
671 pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
675 private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
676 NodeConnector head = e.getHeadNodeConnector();
677 synchronized (pendingUpdates) {
678 List<PendingUpdateTask> list = pendingUpdates.get(head);
680 log.warn("Enqueue edge update: edge {}, type {}", e, t);
681 PendingUpdateTask task = new PendingUpdateTask(e, p, t);
683 pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
691 private boolean removePendingEvent(PendingUpdateTask t) {
693 NodeConnector head = t.getHeadNodeConnector();
694 boolean removed = false;
696 synchronized (pendingUpdates) {
697 List<PendingUpdateTask> list = pendingUpdates.get(head);
699 removed = list.remove(t);
700 if (list.isEmpty()) {
701 pendingUpdates.remove(head);
709 private void removePendingEvent(NodeConnector head, boolean doFlush) {
710 List<PendingUpdateTask> list;
711 synchronized (pendingUpdates) {
712 list = pendingUpdates.remove(head);
716 for (PendingUpdateTask task : list) {
717 if (task.cancel() && doFlush) {
721 pendingTimer.purge();
725 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
726 return edgeUpdate(e, type, props, false);
729 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
730 if (!type.equals(UpdateType.ADDED) &&
731 enqueueEventIfPending(e, props, type)) {
737 if (this.edgesDB.containsKey(e)) {
738 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
739 log.trace("Skipping redundant edge addition: {}", e);
743 // Ensure that head node connector exists
745 if (headNodeConnectorExist(e)) {
746 removePendingEvent(e.getHeadNodeConnector(), true);
748 log.warn("Ignore edge that contains invalid node connector: {}",
750 addPendingEvent(e, props, type);
755 // Make sure the props are non-null or create a copy
757 props = new HashSet<Property>();
759 props = new HashSet<Property>(props);
762 // Check if nodeConnectors of the edge were correctly categorized
763 // by protocol plugin
764 crossCheckNodeConnectors(e);
766 // Now make sure there is the creation timestamp for the
767 // edge, if not there, stamp with the first update
768 boolean found_create = false;
769 for (Property prop : props) {
770 if (prop instanceof TimeStamp) {
771 TimeStamp t = (TimeStamp) prop;
772 if (t.getTimeStampName().equals("creation")) {
780 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
784 // Now add this in the database eventually overriding
785 // something that may have been already existing
786 this.edgesDB.put(e, props);
788 // Now populate the DB of NodeConnectors
789 // NOTE WELL: properties are empty sets, not really needed
791 // The DB only contains ISL ports
793 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
794 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
796 log.trace("Edge {} {}", e.toString(), type.name());
799 // Now remove the edge from edgesDB
800 this.edgesDB.remove(e);
802 // Now lets update the NodeConnectors DB, the assumption
803 // here is that two NodeConnector are exclusively
804 // connected by 1 and only 1 edge, this is reasonable in
805 // the same plug (virtual of phisical) we can assume two
806 // cables won't be plugged. This could break only in case
807 // of devices in the middle that acts as hubs, but it
808 // should be safe to assume that won't happen.
809 this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
810 this.nodeConnectorsDB.remove(e.getTailNodeConnector());
811 log.trace("Edge {} {}", e.toString(), type.name());
814 Set<Property> oldProps = this.edgesDB.get(e);
816 // When property(s) changes lets make sure we can change it
817 // all except the creation time stamp because that should
818 // be set only when the edge is created
819 TimeStamp timeStamp = null;
820 if (oldProps != null) {
821 for (Property prop : oldProps) {
822 if (prop instanceof TimeStamp) {
823 TimeStamp tsProp = (TimeStamp) prop;
824 if (tsProp.getTimeStampName().equals("creation")) {
832 // Now lets make sure new properties are non-null
834 props = new HashSet<Property>();
836 // Copy the set so noone is going to change the content
837 props = new HashSet<Property>(props);
840 // Now lets remove the creation property if exist in the
842 for (Iterator<Property> i = props.iterator(); i.hasNext();) {
843 Property prop = i.next();
844 if (prop instanceof TimeStamp) {
845 TimeStamp t = (TimeStamp) prop;
846 if (t.getTimeStampName().equals("creation")) {
847 if (timeStamp != null) {
855 // Now lets add the creation timestamp in it
856 if (timeStamp != null) {
857 props.add(timeStamp);
861 this.edgesDB.put(e, props);
862 log.trace("Edge {} {}", e.toString(), type.name());
865 return new TopoEdgeUpdate(e, props, type);
868 private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
869 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
870 for (TopoEdgeUpdate teu : topoedgeupdateList) {
871 boolean isPending = (teu instanceof PendingEdgeUpdate);
872 Edge e = teu.getEdge();
873 Set<Property> p = teu.getProperty();
874 UpdateType type = teu.getUpdateType();
875 TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
876 if (update != null) {
881 if (!teuList.isEmpty()) {
882 // Now update the listeners
883 for (ITopologyManagerAware s : this.topologyManagerAware) {
885 s.edgeUpdate(teuList);
886 } catch (Exception exc) {
887 log.error("Exception on edge update:", exc);
894 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
895 updateQ.addAll(topoedgeupdateList);
898 private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
899 TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
900 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
901 return getLinkTuple(rLink);
905 private Edge getLinkTuple(TopologyUserLinkConfig link) {
906 NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
907 NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
909 return new Edge(srcNodeConnector, dstNodeConnector);
910 } catch (Exception e) {
916 public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
917 return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
921 public Status addUserLink(TopologyUserLinkConfig userLink) {
922 if (!userLink.isValid()) {
923 return new Status(StatusCode.BADREQUEST,
924 "User link configuration invalid.");
926 userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
928 //Check if this link already configured
929 //NOTE: infinispan cache doesn't support Map.containsValue()
930 // (which is linear time in most ConcurrentMap impl anyway)
931 for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
932 if (existingLink.equals(userLink)) {
933 return new Status(StatusCode.CONFLICT, "Link configuration exists");
936 //attempt put, if mapping for this key already existed return conflict
937 if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
938 return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
939 + " already exists. Please use another name");
942 Edge linkTuple = getLinkTuple(userLink);
943 if (linkTuple != null) {
944 if (!isProductionLink(linkTuple)) {
945 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
946 new HashSet<Property>());
948 userLinksDB.remove(userLink.getName());
949 return new Status(StatusCode.NOTFOUND,
950 "Link configuration contains invalid node connector: "
955 linkTuple = getReverseLinkTuple(userLink);
956 if (linkTuple != null) {
957 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
958 if (!isProductionLink(linkTuple)) {
959 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
963 return new Status(StatusCode.SUCCESS);
967 public Status deleteUserLink(String linkName) {
968 if (linkName == null) {
969 return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
972 TopologyUserLinkConfig link = userLinksDB.remove(linkName);
974 if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
975 if (! isProductionLink(linkTuple)) {
976 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
979 linkTuple = getReverseLinkTuple(link);
980 if (! isProductionLink(linkTuple)) {
981 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
984 return new Status(StatusCode.SUCCESS);
987 private void registerWithOSGIConsole() {
988 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
990 bundleContext.registerService(CommandProvider.class.getName(), this,
995 public String getHelp() {
996 StringBuffer help = new StringBuffer();
997 help.append("---Topology Manager---\n");
998 help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
999 help.append("\t deleteUserLink <name>\n");
1000 help.append("\t printUserLink\n");
1001 help.append("\t printNodeEdges\n");
1002 return help.toString();
1005 public void _printUserLink(CommandInterpreter ci) {
1006 for (String name : this.userLinksDB.keySet()) {
1007 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
1008 ci.println("Name : " + name);
1009 ci.println(linkConfig);
1010 ci.println("Edge " + getLinkTuple(linkConfig));
1011 ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
1015 public void _addUserLink(CommandInterpreter ci) {
1016 String name = ci.nextArgument();
1017 if ((name == null)) {
1018 ci.println("Please enter a valid Name");
1022 String ncStr1 = ci.nextArgument();
1023 if (ncStr1 == null) {
1024 ci.println("Please enter two node connector strings");
1027 String ncStr2 = ci.nextArgument();
1028 if (ncStr2 == null) {
1029 ci.println("Please enter second node connector string");
1033 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
1035 ci.println("Invalid input node connector 1 string: " + ncStr1);
1038 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
1040 ci.println("Invalid input node connector 2 string: " + ncStr2);
1044 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
1045 ci.println(this.addUserLink(config));
1048 public void _deleteUserLink(CommandInterpreter ci) {
1049 String name = ci.nextArgument();
1050 if ((name == null)) {
1051 ci.println("Please enter a valid Name");
1054 this.deleteUserLink(name);
1057 public void _printNodeEdges(CommandInterpreter ci) {
1058 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
1059 if (nodeEdges == null) {
1062 Set<Node> nodeSet = nodeEdges.keySet();
1063 if (nodeSet == null) {
1066 ci.println(" Node Edge");
1067 for (Node node : nodeSet) {
1068 Set<Edge> edgeSet = nodeEdges.get(node);
1069 if (edgeSet == null) {
1072 for (Edge edge : edgeSet) {
1073 ci.println(node + " " + edge);
1079 public Object readObject(ObjectInputStream ois)
1080 throws FileNotFoundException, IOException, ClassNotFoundException {
1081 return ois.readObject();
1085 public Status saveConfiguration() {
1086 return saveConfig();
1090 public void edgeOverUtilized(Edge edge) {
1091 log.warn("Link Utilization above normal: {}", edge);
1095 public void edgeUtilBackToNormal(Edge edge) {
1096 log.warn("Link Utilization back to normal: {}", edge);
1099 private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
1100 TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
1101 upd.setLocal(isLocal);
1106 public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
1111 public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
1112 // Remove pending edge updates for the given node connector.
1113 // Pending events should be notified if the node connector exists.
1114 boolean doFlush = !type.equals(UpdateType.REMOVED);
1115 removePendingEvent(nc, doFlush);
1119 public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
1120 if (cacheName.equals(TOPOEDGESDB)) {
1121 // This is the case of an Edge being added to the topology DB
1122 final Edge e = (Edge) key;
1123 log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
1124 edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
1129 public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
1130 if (cacheName.equals(TOPOEDGESDB)) {
1131 final Edge e = (Edge) key;
1132 log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
1133 final Set<Property> props = (Set<Property>) new_value;
1134 edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
1139 public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
1140 if (cacheName.equals(TOPOEDGESDB)) {
1141 final Edge e = (Edge) key;
1142 log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
1143 edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
1147 class TopologyNotify implements Runnable {
1148 private final BlockingQueue<TopoEdgeUpdate> notifyQ;
1149 private TopoEdgeUpdate entry;
1150 private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
1151 private boolean notifyListeners;
1153 TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
1154 this.notifyQ = notifyQ;
1161 log.trace("New run of TopologyNotify");
1162 notifyListeners = false;
1163 // First we block waiting for an element to get in
1164 entry = notifyQ.take();
1165 // Then we drain the whole queue if elements are
1166 // in it without getting into any blocking condition
1167 for (; entry != null; entry = notifyQ.poll()) {
1169 notifyListeners = true;
1172 // Notify listeners only if there were updates drained else
1174 if (notifyListeners) {
1175 log.trace("Notifier thread, notified a listener");
1176 // Now update the listeners
1177 for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
1179 s.edgeUpdate(teuList);
1180 } catch (Exception exc) {
1181 log.error("Exception on edge update:", exc);
1187 // Lets sleep for sometime to allow aggregation of event
1189 } catch (InterruptedException e1) {
1193 log.warn("TopologyNotify interrupted {}", e1.getMessage());
1194 } catch (Exception e2) {
1201 public List<String> printUserLink() {
1202 List<String> result = new ArrayList<String>();
1203 for (String name : this.userLinksDB.keySet()) {
1204 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
1205 result.add("Name : " + name);
1206 result.add(linkConfig.toString());
1207 result.add("Edge " + getLinkTuple(linkConfig));
1208 result.add("Reverse Edge " + getReverseLinkTuple(linkConfig));
1213 public List<String> addUserLink(String name, String ncStr1, String ncStr2) {
1214 List<String> result = new ArrayList<String>();
1215 if ((name == null)) {
1216 result.add("Please enter a valid Name");
1220 if (ncStr1 == null) {
1221 result.add("Please enter two node connector strings");
1224 if (ncStr2 == null) {
1225 result.add("Please enter second node connector string");
1229 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
1231 result.add("Invalid input node connector 1 string: " + ncStr1);
1234 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
1236 result.add("Invalid input node connector 2 string: " + ncStr2);
1240 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
1241 result.add(this.addUserLink(config).toString());
1245 public List<String> deleteUserLinkShell(String name) {
1246 List<String> result = new ArrayList<String>();
1247 if ((name == null)) {
1248 result.add("Please enter a valid Name");
1251 this.deleteUserLink(name);
1255 public List<String> printNodeEdges() {
1256 List<String> result = new ArrayList<String>();
1257 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
1258 if (nodeEdges == null) {
1261 Set<Node> nodeSet = nodeEdges.keySet();
1262 if (nodeSet == null) {
1265 result.add(" Node Edge");
1266 for (Node node : nodeSet) {
1267 Set<Edge> edgeSet = nodeEdges.get(node);
1268 if (edgeSet == null) {
1271 for (Edge edge : edgeSet) {
1272 result.add(node + " " + edge);
1278 // Only for unit test.
1280 pendingTimer = new Timer("Topology Pending Update Timer");
1281 updateThread = new Thread(new UpdateTopology(), "Topology Update");
1282 updateThread.start();
1286 shuttingDown = true;
1287 updateThread.interrupt();
1288 pendingTimer.cancel();
1291 boolean flushUpdateQueue(long timeout) {
1292 long limit = System.currentTimeMillis() + timeout;
1295 if (updateQ.peek() == null) {
1301 } catch (InterruptedException e) {
1304 cur = System.currentTimeMillis();
1305 } while (cur < limit);