2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.topologymanager.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Date;
16 import java.util.Dictionary;
17 import java.util.EnumSet;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.topologymanager.ITopologyManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
59 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * The class describes TopologyManager which is the central repository of the
67 * network topology. It provides service for applications to interact with
68 * topology database and notifies all the listeners of topology changes.
70 public class TopologyManagerImpl implements
73 IConfigurationContainerAware,
77 static final String TOPOEDGESDB = "topologymanager.edgesDB";
78 static final String TOPOHOSTSDB = "topologymanager.hostsDB";
79 static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
80 static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
81 private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
82 private static final String SAVE = "Save";
83 private ITopologyService topoService;
84 private IClusterContainerServices clusterContainerService;
85 // DB of all the Edges with properties which constitute our topology
86 private ConcurrentMap<Edge, Set<Property>> edgesDB;
87 // DB of all NodeConnector which are part of ISL Edges, meaning they
88 // are connected to another NodeConnector on the other side of an ISL link.
89 // NodeConnector of a Production Edge is not part of this DB.
90 private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
91 // DB of all the NodeConnectors with an Host attached to it
92 private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
93 // Topology Manager Aware listeners
94 private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
95 // Topology Manager Aware listeners - for clusterwide updates
96 private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
97 new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
98 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
99 private String userLinksFileName;
100 private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
101 private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
102 private volatile Boolean shuttingDown = false;
103 private Thread notifyThread;
106 void nonClusterObjectCreate() {
107 edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
108 hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
109 nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
110 userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
113 void setTopologyManagerAware(ITopologyManagerAware s) {
114 if (this.topologyManagerAware != null) {
115 log.debug("Adding ITopologyManagerAware: {}", s);
116 this.topologyManagerAware.add(s);
120 void unsetTopologyManagerAware(ITopologyManagerAware s) {
121 if (this.topologyManagerAware != null) {
122 log.debug("Removing ITopologyManagerAware: {}", s);
123 this.topologyManagerAware.remove(s);
127 void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
128 if (this.topologyManagerClusterWideAware != null) {
129 log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
130 this.topologyManagerClusterWideAware.add(s);
134 void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
135 if (this.topologyManagerClusterWideAware != null) {
136 log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
137 this.topologyManagerClusterWideAware.remove(s);
141 void setTopoService(ITopologyService s) {
142 log.debug("Adding ITopologyService: {}", s);
143 this.topoService = s;
146 void unsetTopoService(ITopologyService s) {
147 if (this.topoService == s) {
148 log.debug("Removing ITopologyService: {}", s);
149 this.topoService = null;
153 void setClusterContainerService(IClusterContainerServices s) {
154 log.debug("Cluster Service set");
155 this.clusterContainerService = s;
158 void unsetClusterContainerService(IClusterContainerServices s) {
159 if (this.clusterContainerService == s) {
160 log.debug("Cluster Service removed!");
161 this.clusterContainerService = null;
166 * Function called by the dependency manager when all the required
167 * dependencies are satisfied
170 void init(Component c) {
173 String containerName = null;
174 Dictionary<?, ?> props = c.getServiceProperties();
176 containerName = (String) props.get("containerName");
178 // In the Global instance case the containerName is empty
179 containerName = "UNKNOWN";
182 userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
183 registerWithOSGIConsole();
185 // Restore the shuttingDown status on init of the component
186 shuttingDown = false;
187 notifyThread = new Thread(new TopologyNotify(notifyQ));
190 @SuppressWarnings({ "unchecked", "deprecation" })
191 private void allocateCaches() {
194 (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
195 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
196 } catch (CacheExistException cee) {
197 log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
198 } catch (CacheConfigException cce) {
199 log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
204 (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
205 TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
206 } catch (CacheExistException cee) {
207 log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
208 } catch (CacheConfigException cce) {
209 log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
213 this.nodeConnectorsDB =
214 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
215 TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
216 } catch (CacheExistException cee) {
217 log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
218 } catch (CacheConfigException cce) {
219 log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
224 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
225 TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
226 } catch (CacheExistException cee) {
227 log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
228 } catch (CacheConfigException cce) {
229 log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
233 @SuppressWarnings({ "unchecked", "deprecation" })
234 private void retrieveCaches() {
235 if (this.clusterContainerService == null) {
236 log.error("Cluster Services is null, can't retrieve caches.");
240 this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
241 if (edgesDB == null) {
242 log.error("Failed to get cache for " + TOPOEDGESDB);
246 (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
247 if (hostsDB == null) {
248 log.error("Failed to get cache for " + TOPOHOSTSDB);
251 this.nodeConnectorsDB =
252 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
253 if (nodeConnectorsDB == null) {
254 log.error("Failed to get cache for " + TOPONODECONNECTORDB);
258 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
259 if (userLinksDB == null) {
260 log.error("Failed to get cache for " + TOPOUSERLINKSDB);
265 * Function called after the topology manager has registered the service in
266 * OSGi service registry.
270 // Start the batcher thread for the cluster wide topology updates
271 notifyThread.start();
272 // SollicitRefresh MUST be called here else if called at init
273 // time it may sollicit refresh too soon.
274 log.debug("Sollicit topology refresh");
275 topoService.sollicitRefresh();
280 notifyThread.interrupt();
284 * Function called by the dependency manager when at least one dependency
285 * become unsatisfied or when the component is shutting down because for
286 * example bundle is being stopped.
294 @SuppressWarnings("unchecked")
295 private void loadConfiguration() {
296 ObjectReader objReader = new ObjectReader();
297 ConcurrentMap<String, TopologyUserLinkConfig> confList =
298 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
300 if (confList != null) {
301 for (TopologyUserLinkConfig conf : confList.values()) {
308 public Status saveConfig() {
309 return saveConfigInternal();
312 public Status saveConfigInternal() {
313 ObjectWriter objWriter = new ObjectWriter();
315 Status saveStatus = objWriter.write(
316 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
318 if (! saveStatus.isSuccess()) {
319 return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
325 public Map<Node, Set<Edge>> getNodeEdges() {
326 if (this.edgesDB == null) {
330 Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
331 for (Edge edge : this.edgesDB.keySet()) {
332 // Lets analyze the tail
333 Node node = edge.getTailNodeConnector().getNode();
334 Set<Edge> nodeEdges = res.get(node);
335 if (nodeEdges == null) {
336 nodeEdges = new HashSet<Edge>();
337 res.put(node, nodeEdges);
341 // Lets analyze the head
342 node = edge.getHeadNodeConnector().getNode();
343 nodeEdges = res.get(node);
344 if (nodeEdges == null) {
345 nodeEdges = new HashSet<Edge>();
346 res.put(node, nodeEdges);
355 public boolean isInternal(NodeConnector p) {
356 if (this.nodeConnectorsDB == null) {
360 // This is an internal NodeConnector if is connected to
361 // another Node i.e it's part of the nodeConnectorsDB
362 return (this.nodeConnectorsDB.get(p) != null);
366 * This method returns true if the edge is an ISL link.
370 * @return true if it is an ISL link
372 public boolean isISLink(Edge e) {
373 return (!isProductionLink(e));
377 * This method returns true if the edge is a production link.
381 * @return true if it is a production link
383 public boolean isProductionLink(Edge e) {
384 return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
385 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
389 * The Map returned is a copy of the current topology hence if the topology
390 * changes the copy doesn't
392 * @return A Map representing the current topology expressed as edges of the
396 public Map<Edge, Set<Property>> getEdges() {
397 if (this.edgesDB == null) {
401 Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
403 for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
404 // Sets of props are copied because the composition of
405 // those properties could change with time
406 props = new HashSet<Property>(edgeEntry.getValue());
407 // We can simply reuse the key because the object is
408 // immutable so doesn't really matter that we are
409 // referencing the only owned by a different table, the
410 // meaning is the same because doesn't change with time.
411 edgeMap.put(edgeEntry.getKey(), props);
418 public Set<NodeConnector> getNodeConnectorWithHost() {
419 if (this.hostsDB == null) {
423 return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
427 public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
428 if (this.hostsDB == null) {
431 HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
433 Set<NodeConnector> portSet;
434 for (NodeConnector nc : this.hostsDB.keySet()) {
436 portSet = res.get(node);
437 if (portSet == null) {
438 // Create the HashSet if null
439 portSet = new HashSet<NodeConnector>();
440 res.put(node, portSet);
443 // Keep updating the HashSet, given this is not a
444 // clustered map we can just update the set without
445 // worrying to update the hashmap.
453 public Host getHostAttachedToNodeConnector(NodeConnector port) {
454 ImmutablePair<Host, Set<Property>> host;
455 if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
458 return host.getLeft();
462 public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
464 // Clone the property set in case non null else just
465 // create an empty one. Caches allocated via infinispan
466 // don't allow null values
468 props = new HashSet<Property>();
470 props = new HashSet<Property>(props);
472 ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
477 this.hostsDB.put(port, thisHost);
480 //remove only if hasn't been concurrently modified
481 this.hostsDB.remove(port, thisHost);
486 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
489 // Make sure the props are non-null
491 props = new HashSet<Property>();
493 props = new HashSet<Property>(props);
496 //in case of node switch-over to a different cluster controller,
497 //let's retain edge props
498 Set<Property> currentProps = this.edgesDB.get(e);
499 if (currentProps != null){
500 props.addAll(currentProps);
503 // Now make sure there is the creation timestamp for the
504 // edge, if not there, stamp with the first update
505 boolean found_create = false;
506 for (Property prop : props) {
507 if (prop instanceof TimeStamp) {
508 TimeStamp t = (TimeStamp) prop;
509 if (t.getTimeStampName().equals("creation")) {
517 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
521 // Now add this in the database eventually overriding
522 // something that may have been already existing
523 this.edgesDB.put(e, props);
525 // Now populate the DB of NodeConnectors
526 // NOTE WELL: properties are empty sets, not really needed
528 // The DB only contains ISL ports
530 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
531 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
533 log.trace("Edge {} {}", e.toString(), type.name());
536 // Now remove the edge from edgesDB
537 this.edgesDB.remove(e);
539 // Now lets update the NodeConnectors DB, the assumption
540 // here is that two NodeConnector are exclusively
541 // connected by 1 and only 1 edge, this is reasonable in
542 // the same plug (virtual of phisical) we can assume two
543 // cables won't be plugged. This could break only in case
544 // of devices in the middle that acts as hubs, but it
545 // should be safe to assume that won't happen.
546 this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
547 this.nodeConnectorsDB.remove(e.getTailNodeConnector());
548 log.trace("Edge {} {}", e.toString(), type.name());
551 Set<Property> oldProps = this.edgesDB.get(e);
553 // When property changes lets make sure we can change it
554 // all except the creation time stamp because that should
555 // be changed only when the edge is destroyed and created
557 TimeStamp timeStamp = null;
558 for (Property prop : oldProps) {
559 if (prop instanceof TimeStamp) {
560 TimeStamp tsProp = (TimeStamp) prop;
561 if (tsProp.getTimeStampName().equals("creation")) {
568 // Now lets make sure new properties are non-null
570 props = new HashSet<Property>();
572 // Copy the set so noone is going to change the content
573 props = new HashSet<Property>(props);
576 // Now lets remove the creation property if exist in the
578 for (Iterator<Property> i = props.iterator(); i.hasNext();) {
579 Property prop = i.next();
580 if (prop instanceof TimeStamp) {
581 TimeStamp t = (TimeStamp) prop;
582 if (t.getTimeStampName().equals("creation")) {
589 // Now lets add the creation timestamp in it
590 if (timeStamp != null) {
591 props.add(timeStamp);
595 this.edgesDB.put(e, props);
596 log.trace("Edge {} {}", e.toString(), type.name());
599 return new TopoEdgeUpdate(e, props, type);
603 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
604 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
605 for (int i = 0; i < topoedgeupdateList.size(); i++) {
606 Edge e = topoedgeupdateList.get(i).getEdge();
607 Set<Property> p = topoedgeupdateList.get(i).getProperty();
608 UpdateType type = topoedgeupdateList.get(i).getUpdateType();
609 TopoEdgeUpdate teu = edgeUpdate(e, type, p);
613 // Now update the listeners
614 for (ITopologyManagerAware s : this.topologyManagerAware) {
616 s.edgeUpdate(teuList);
617 } catch (Exception exc) {
618 log.error("Exception on edge update:", exc);
624 private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
625 TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
626 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
627 return getLinkTuple(rLink);
631 private Edge getLinkTuple(TopologyUserLinkConfig link) {
632 NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
633 NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
635 return new Edge(srcNodeConnector, dstNodeConnector);
636 } catch (Exception e) {
642 public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
643 return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
647 public Status addUserLink(TopologyUserLinkConfig userLink) {
648 if (!userLink.isValid()) {
649 return new Status(StatusCode.BADREQUEST,
650 "User link configuration invalid.");
652 userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
654 //Check if this link already configured
655 //NOTE: infinispan cache doesn't support Map.containsValue()
656 // (which is linear time in most ConcurrentMap impl anyway)
657 for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
658 if (existingLink.equals(userLink)) {
659 return new Status(StatusCode.CONFLICT, "Link configuration exists");
662 //attempt put, if mapping for this key already existed return conflict
663 if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
664 return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
665 + " already exists. Please use another name");
668 Edge linkTuple = getLinkTuple(userLink);
669 if (linkTuple != null) {
670 if (!isProductionLink(linkTuple)) {
671 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
674 linkTuple = getReverseLinkTuple(userLink);
675 if (linkTuple != null) {
676 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
677 if (!isProductionLink(linkTuple)) {
678 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
682 return new Status(StatusCode.SUCCESS);
686 public Status deleteUserLink(String linkName) {
687 if (linkName == null) {
688 return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
691 TopologyUserLinkConfig link = userLinksDB.remove(linkName);
693 if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
694 if (! isProductionLink(linkTuple)) {
695 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
698 linkTuple = getReverseLinkTuple(link);
699 if (! isProductionLink(linkTuple)) {
700 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
703 return new Status(StatusCode.SUCCESS);
706 private void registerWithOSGIConsole() {
707 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
709 bundleContext.registerService(CommandProvider.class.getName(), this,
714 public String getHelp() {
715 StringBuffer help = new StringBuffer();
716 help.append("---Topology Manager---\n");
717 help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
718 help.append("\t deleteUserLink <name>\n");
719 help.append("\t printUserLink\n");
720 help.append("\t printNodeEdges\n");
721 return help.toString();
724 public void _printUserLink(CommandInterpreter ci) {
725 for (String name : this.userLinksDB.keySet()) {
726 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
727 ci.println("Name : " + name);
728 ci.println(linkConfig);
729 ci.println("Edge " + getLinkTuple(linkConfig));
730 ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
734 public void _addUserLink(CommandInterpreter ci) {
735 String name = ci.nextArgument();
736 if ((name == null)) {
737 ci.println("Please enter a valid Name");
741 String ncStr1 = ci.nextArgument();
742 if (ncStr1 == null) {
743 ci.println("Please enter two node connector strings");
746 String ncStr2 = ci.nextArgument();
747 if (ncStr2 == null) {
748 ci.println("Please enter second node connector string");
752 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
754 ci.println("Invalid input node connector 1 string: " + ncStr1);
757 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
759 ci.println("Invalid input node connector 2 string: " + ncStr2);
763 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
764 ci.println(this.addUserLink(config));
767 public void _deleteUserLink(CommandInterpreter ci) {
768 String name = ci.nextArgument();
769 if ((name == null)) {
770 ci.println("Please enter a valid Name");
773 this.deleteUserLink(name);
776 public void _printNodeEdges(CommandInterpreter ci) {
777 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
778 if (nodeEdges == null) {
781 Set<Node> nodeSet = nodeEdges.keySet();
782 if (nodeSet == null) {
785 ci.println(" Node Edge");
786 for (Node node : nodeSet) {
787 Set<Edge> edgeSet = nodeEdges.get(node);
788 if (edgeSet == null) {
791 for (Edge edge : edgeSet) {
792 ci.println(node + " " + edge);
798 public Object readObject(ObjectInputStream ois)
799 throws FileNotFoundException, IOException, ClassNotFoundException {
800 return ois.readObject();
804 public Status saveConfiguration() {
809 public void edgeOverUtilized(Edge edge) {
810 log.warn("Link Utilization above normal: {}", edge);
814 public void edgeUtilBackToNormal(Edge edge) {
815 log.warn("Link Utilization back to normal: {}", edge);
818 private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
819 TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
820 upd.setLocal(isLocal);
825 public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
826 if (cacheName.equals(TOPOEDGESDB)) {
827 // This is the case of an Edge being added to the topology DB
828 final Edge e = (Edge) key;
829 log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
830 edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
835 public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
836 if (cacheName.equals(TOPOEDGESDB)) {
837 final Edge e = (Edge) key;
838 log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
839 final Set<Property> props = (Set<Property>) new_value;
840 edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
845 public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
846 if (cacheName.equals(TOPOEDGESDB)) {
847 final Edge e = (Edge) key;
848 log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
849 edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
853 class TopologyNotify implements Runnable {
854 private final BlockingQueue<TopoEdgeUpdate> notifyQ;
855 private TopoEdgeUpdate entry;
856 private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
857 private boolean notifyListeners;
859 TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
860 this.notifyQ = notifyQ;
867 log.trace("New run of TopologyNotify");
868 notifyListeners = false;
869 // First we block waiting for an element to get in
870 entry = notifyQ.take();
871 // Then we drain the whole queue if elements are
872 // in it without getting into any blocking condition
873 for (; entry != null; entry = notifyQ.poll()) {
875 notifyListeners = true;
878 // Notify listeners only if there were updates drained else
880 if (notifyListeners) {
881 log.trace("Notifier thread, notified a listener");
882 // Now update the listeners
883 for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
885 s.edgeUpdate(teuList);
886 } catch (Exception exc) {
887 log.error("Exception on edge update:", exc);
893 // Lets sleep for sometime to allow aggregation of event
895 } catch (InterruptedException e1) {
896 log.warn("TopologyNotify interrupted {}", e1.getMessage());
900 } catch (Exception e2) {