2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.topologymanager.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.switchmanager.ISwitchManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
60 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
61 import org.osgi.framework.BundleContext;
62 import org.osgi.framework.FrameworkUtil;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
67 * The class describes TopologyManager which is the central repository of the
68 * network topology. It provides service for applications to interact with
69 * topology database and notifies all the listeners of topology changes.
71 public class TopologyManagerImpl implements
72 ICacheUpdateAware<Object, Object>,
74 IConfigurationContainerAware,
78 static final String TOPOEDGESDB = "topologymanager.edgesDB";
79 static final String TOPOHOSTSDB = "topologymanager.hostsDB";
80 static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
81 static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
82 private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
83 private ITopologyService topoService;
84 private IClusterContainerServices clusterContainerService;
85 private ISwitchManager switchManager;
86 // DB of all the Edges with properties which constitute our topology
87 private ConcurrentMap<Edge, Set<Property>> edgesDB;
88 // DB of all NodeConnector which are part of ISL Edges, meaning they
89 // are connected to another NodeConnector on the other side of an ISL link.
90 // NodeConnector of a Production Edge is not part of this DB.
91 private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
92 // DB of all the NodeConnectors with an Host attached to it
93 private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
94 // Topology Manager Aware listeners
95 private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
96 // Topology Manager Aware listeners - for clusterwide updates
97 private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
98 new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
99 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
100 private String userLinksFileName;
101 private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
102 private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
103 private volatile Boolean shuttingDown = false;
104 private Thread notifyThread;
107 void nonClusterObjectCreate() {
108 edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
109 hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
110 nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
111 userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
114 void setTopologyManagerAware(ITopologyManagerAware s) {
115 if (this.topologyManagerAware != null) {
116 log.debug("Adding ITopologyManagerAware: {}", s);
117 this.topologyManagerAware.add(s);
121 void unsetTopologyManagerAware(ITopologyManagerAware s) {
122 if (this.topologyManagerAware != null) {
123 log.debug("Removing ITopologyManagerAware: {}", s);
124 this.topologyManagerAware.remove(s);
128 void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
129 if (this.topologyManagerClusterWideAware != null) {
130 log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
131 this.topologyManagerClusterWideAware.add(s);
135 void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
136 if (this.topologyManagerClusterWideAware != null) {
137 log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
138 this.topologyManagerClusterWideAware.remove(s);
142 void setTopoService(ITopologyService s) {
143 log.debug("Adding ITopologyService: {}", s);
144 this.topoService = s;
147 void unsetTopoService(ITopologyService s) {
148 if (this.topoService == s) {
149 log.debug("Removing ITopologyService: {}", s);
150 this.topoService = null;
154 void setClusterContainerService(IClusterContainerServices s) {
155 log.debug("Cluster Service set");
156 this.clusterContainerService = s;
159 void unsetClusterContainerService(IClusterContainerServices s) {
160 if (this.clusterContainerService == s) {
161 log.debug("Cluster Service removed!");
162 this.clusterContainerService = null;
166 void setSwitchManager(ISwitchManager s) {
167 log.debug("Adding ISwitchManager: {}", s);
168 this.switchManager = s;
171 void unsetSwitchManager(ISwitchManager s) {
172 if (this.switchManager == s) {
173 log.debug("Removing ISwitchManager: {}", s);
174 this.switchManager = null;
179 * Function called by the dependency manager when all the required
180 * dependencies are satisfied
183 void init(Component c) {
186 String containerName = null;
187 Dictionary<?, ?> props = c.getServiceProperties();
189 containerName = (String) props.get("containerName");
191 // In the Global instance case the containerName is empty
192 containerName = "UNKNOWN";
195 userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
196 registerWithOSGIConsole();
198 // Restore the shuttingDown status on init of the component
199 shuttingDown = false;
200 notifyThread = new Thread(new TopologyNotify(notifyQ));
203 @SuppressWarnings({ "unchecked" })
204 private void allocateCaches() {
207 (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
208 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
209 } catch (CacheExistException cee) {
210 log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
211 } catch (CacheConfigException cce) {
212 log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
217 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
218 TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
219 } catch (CacheExistException cee) {
220 log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
221 } catch (CacheConfigException cce) {
222 log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
226 this.nodeConnectorsDB =
227 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
228 TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
229 } catch (CacheExistException cee) {
230 log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
231 } catch (CacheConfigException cce) {
232 log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
237 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
238 TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
239 } catch (CacheExistException cee) {
240 log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
241 } catch (CacheConfigException cce) {
242 log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
246 @SuppressWarnings({ "unchecked" })
247 private void retrieveCaches() {
248 if (this.clusterContainerService == null) {
249 log.error("Cluster Services is null, can't retrieve caches.");
253 this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
254 if (edgesDB == null) {
255 log.error("Failed to get cache for " + TOPOEDGESDB);
259 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
260 if (hostsDB == null) {
261 log.error("Failed to get cache for " + TOPOHOSTSDB);
264 this.nodeConnectorsDB =
265 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
266 if (nodeConnectorsDB == null) {
267 log.error("Failed to get cache for " + TOPONODECONNECTORDB);
271 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
272 if (userLinksDB == null) {
273 log.error("Failed to get cache for " + TOPOUSERLINKSDB);
278 * Function called after the topology manager has registered the service in
279 * OSGi service registry.
283 // Start the batcher thread for the cluster wide topology updates
284 notifyThread.start();
285 // SollicitRefresh MUST be called here else if called at init
286 // time it may sollicit refresh too soon.
287 log.debug("Sollicit topology refresh");
288 topoService.sollicitRefresh();
293 notifyThread.interrupt();
297 * Function called by the dependency manager when at least one dependency
298 * become unsatisfied or when the component is shutting down because for
299 * example bundle is being stopped.
307 @SuppressWarnings("unchecked")
308 private void loadConfiguration() {
309 ObjectReader objReader = new ObjectReader();
310 ConcurrentMap<String, TopologyUserLinkConfig> confList =
311 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
313 if (confList != null) {
314 for (TopologyUserLinkConfig conf : confList.values()) {
321 public Status saveConfig() {
322 return saveConfigInternal();
325 public Status saveConfigInternal() {
326 ObjectWriter objWriter = new ObjectWriter();
328 Status saveStatus = objWriter.write(
329 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
331 if (! saveStatus.isSuccess()) {
332 return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
338 public Map<Node, Set<Edge>> getNodeEdges() {
339 if (this.edgesDB == null) {
343 Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
344 for (Edge edge : this.edgesDB.keySet()) {
345 // Lets analyze the tail
346 Node node = edge.getTailNodeConnector().getNode();
347 Set<Edge> nodeEdges = res.get(node);
348 if (nodeEdges == null) {
349 nodeEdges = new HashSet<Edge>();
350 res.put(node, nodeEdges);
354 // Lets analyze the head
355 node = edge.getHeadNodeConnector().getNode();
356 nodeEdges = res.get(node);
357 if (nodeEdges == null) {
358 nodeEdges = new HashSet<Edge>();
359 res.put(node, nodeEdges);
368 public boolean isInternal(NodeConnector p) {
369 if (this.nodeConnectorsDB == null) {
373 // This is an internal NodeConnector if is connected to
374 // another Node i.e it's part of the nodeConnectorsDB
375 return (this.nodeConnectorsDB.get(p) != null);
379 * This method returns true if the edge is an ISL link.
383 * @return true if it is an ISL link
385 public boolean isISLink(Edge e) {
386 return (!isProductionLink(e));
390 * This method returns true if the edge is a production link.
394 * @return true if it is a production link
396 public boolean isProductionLink(Edge e) {
397 return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
398 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
402 * The Map returned is a copy of the current topology hence if the topology
403 * changes the copy doesn't
405 * @return A Map representing the current topology expressed as edges of the
409 public Map<Edge, Set<Property>> getEdges() {
410 if (this.edgesDB == null) {
414 Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
416 for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
417 // Sets of props are copied because the composition of
418 // those properties could change with time
419 props = new HashSet<Property>(edgeEntry.getValue());
420 // We can simply reuse the key because the object is
421 // immutable so doesn't really matter that we are
422 // referencing the only owned by a different table, the
423 // meaning is the same because doesn't change with time.
424 edgeMap.put(edgeEntry.getKey(), props);
431 public Set<NodeConnector> getNodeConnectorWithHost() {
432 if (this.hostsDB == null) {
436 return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
440 public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
441 if (this.hostsDB == null) {
444 HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
446 Set<NodeConnector> portSet;
447 for (NodeConnector nc : this.hostsDB.keySet()) {
449 portSet = res.get(node);
450 if (portSet == null) {
451 // Create the HashSet if null
452 portSet = new HashSet<NodeConnector>();
453 res.put(node, portSet);
456 // Keep updating the HashSet, given this is not a
457 // clustered map we can just update the set without
458 // worrying to update the hashmap.
466 public Host getHostAttachedToNodeConnector(NodeConnector port) {
467 List<Host> hosts = getHostsAttachedToNodeConnector(port);
468 if(hosts != null && !hosts.isEmpty()){
475 public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
476 Set<ImmutablePair<Host, Set<Property>>> hosts;
477 if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
480 // create a list of hosts
481 List<Host> retHosts = new LinkedList<Host>();
482 for(ImmutablePair<Host, Set<Property>> host : hosts) {
483 retHosts.add(host.getLeft());
489 public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
491 // Clone the property set in case non null else just
492 // create an empty one. Caches allocated via infinispan
493 // don't allow null values
495 props = new HashSet<Property>();
497 props = new HashSet<Property>(props);
499 ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
502 Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
503 if(hostSet == null) {
504 hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
509 hostSet.add(thisHost);
510 this.hostsDB.put(port, hostSet);
513 hostSet.remove(thisHost);
514 if(hostSet.isEmpty()) {
515 //remove only if hasn't been concurrently modified
516 this.hostsDB.remove(port, hostSet);
518 this.hostsDB.put(port, hostSet);
524 private boolean headNodeConnectorExist(Edge e) {
526 * Only check the head end point which is supposed to be part of a
527 * network node we control (present in our inventory). If we checked the
528 * tail end point as well, we would not store the edges that connect to
529 * a non sdn enable port on a non sdn capable production switch. We want
530 * to be able to see these switches on the topology.
532 NodeConnector head = e.getHeadNodeConnector();
533 return (switchManager.doesNodeConnectorExist(head));
536 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
539 // Avoid redundant update as notifications trigger expensive tasks
540 if (edgesDB.containsKey(e)) {
541 log.trace("Skipping redundant edge addition: {}", e);
545 // Ensure that head node connector exists
546 if (!headNodeConnectorExist(e)) {
547 log.warn("Ignore edge that contains invalid node connector: {}", e);
551 // Make sure the props are non-null
553 props = new HashSet<Property>();
555 props = new HashSet<Property>(props);
558 //in case of node switch-over to a different cluster controller,
559 //let's retain edge props
560 Set<Property> currentProps = this.edgesDB.get(e);
561 if (currentProps != null){
562 props.addAll(currentProps);
565 // Now make sure there is the creation timestamp for the
566 // edge, if not there, stamp with the first update
567 boolean found_create = false;
568 for (Property prop : props) {
569 if (prop instanceof TimeStamp) {
570 TimeStamp t = (TimeStamp) prop;
571 if (t.getTimeStampName().equals("creation")) {
579 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
583 // Now add this in the database eventually overriding
584 // something that may have been already existing
585 this.edgesDB.put(e, props);
587 // Now populate the DB of NodeConnectors
588 // NOTE WELL: properties are empty sets, not really needed
590 // The DB only contains ISL ports
592 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
593 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
595 log.trace("Edge {} {}", e.toString(), type.name());
598 // Now remove the edge from edgesDB
599 this.edgesDB.remove(e);
601 // Now lets update the NodeConnectors DB, the assumption
602 // here is that two NodeConnector are exclusively
603 // connected by 1 and only 1 edge, this is reasonable in
604 // the same plug (virtual of phisical) we can assume two
605 // cables won't be plugged. This could break only in case
606 // of devices in the middle that acts as hubs, but it
607 // should be safe to assume that won't happen.
608 this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
609 this.nodeConnectorsDB.remove(e.getTailNodeConnector());
610 log.trace("Edge {} {}", e.toString(), type.name());
613 Set<Property> oldProps = this.edgesDB.get(e);
615 // When property changes lets make sure we can change it
616 // all except the creation time stamp because that should
617 // be changed only when the edge is destroyed and created
619 TimeStamp timeStamp = null;
620 for (Property prop : oldProps) {
621 if (prop instanceof TimeStamp) {
622 TimeStamp tsProp = (TimeStamp) prop;
623 if (tsProp.getTimeStampName().equals("creation")) {
630 // Now lets make sure new properties are non-null
632 props = new HashSet<Property>();
634 // Copy the set so noone is going to change the content
635 props = new HashSet<Property>(props);
638 // Now lets remove the creation property if exist in the
640 for (Iterator<Property> i = props.iterator(); i.hasNext();) {
641 Property prop = i.next();
642 if (prop instanceof TimeStamp) {
643 TimeStamp t = (TimeStamp) prop;
644 if (t.getTimeStampName().equals("creation")) {
651 // Now lets add the creation timestamp in it
652 if (timeStamp != null) {
653 props.add(timeStamp);
657 this.edgesDB.put(e, props);
658 log.trace("Edge {} {}", e.toString(), type.name());
661 return new TopoEdgeUpdate(e, props, type);
665 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
666 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
667 for (int i = 0; i < topoedgeupdateList.size(); i++) {
668 Edge e = topoedgeupdateList.get(i).getEdge();
669 Set<Property> p = topoedgeupdateList.get(i).getProperty();
670 UpdateType type = topoedgeupdateList.get(i).getUpdateType();
671 TopoEdgeUpdate teu = edgeUpdate(e, type, p);
677 if (!teuList.isEmpty()) {
678 // Now update the listeners
679 for (ITopologyManagerAware s : this.topologyManagerAware) {
681 s.edgeUpdate(teuList);
682 } catch (Exception exc) {
683 log.error("Exception on edge update:", exc);
689 private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
690 TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
691 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
692 return getLinkTuple(rLink);
696 private Edge getLinkTuple(TopologyUserLinkConfig link) {
697 NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
698 NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
700 return new Edge(srcNodeConnector, dstNodeConnector);
701 } catch (Exception e) {
707 public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
708 return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
712 public Status addUserLink(TopologyUserLinkConfig userLink) {
713 if (!userLink.isValid()) {
714 return new Status(StatusCode.BADREQUEST,
715 "User link configuration invalid.");
717 userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
719 //Check if this link already configured
720 //NOTE: infinispan cache doesn't support Map.containsValue()
721 // (which is linear time in most ConcurrentMap impl anyway)
722 for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
723 if (existingLink.equals(userLink)) {
724 return new Status(StatusCode.CONFLICT, "Link configuration exists");
727 //attempt put, if mapping for this key already existed return conflict
728 if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
729 return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
730 + " already exists. Please use another name");
733 Edge linkTuple = getLinkTuple(userLink);
734 if (linkTuple != null) {
735 if (!isProductionLink(linkTuple)) {
736 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
737 new HashSet<Property>());
739 userLinksDB.remove(userLink.getName());
740 return new Status(StatusCode.NOTFOUND,
741 "Link configuration contains invalid node connector: "
746 linkTuple = getReverseLinkTuple(userLink);
747 if (linkTuple != null) {
748 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
749 if (!isProductionLink(linkTuple)) {
750 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
754 return new Status(StatusCode.SUCCESS);
758 public Status deleteUserLink(String linkName) {
759 if (linkName == null) {
760 return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
763 TopologyUserLinkConfig link = userLinksDB.remove(linkName);
765 if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
766 if (! isProductionLink(linkTuple)) {
767 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
770 linkTuple = getReverseLinkTuple(link);
771 if (! isProductionLink(linkTuple)) {
772 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
775 return new Status(StatusCode.SUCCESS);
778 private void registerWithOSGIConsole() {
779 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
781 bundleContext.registerService(CommandProvider.class.getName(), this,
786 public String getHelp() {
787 StringBuffer help = new StringBuffer();
788 help.append("---Topology Manager---\n");
789 help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
790 help.append("\t deleteUserLink <name>\n");
791 help.append("\t printUserLink\n");
792 help.append("\t printNodeEdges\n");
793 return help.toString();
796 public void _printUserLink(CommandInterpreter ci) {
797 for (String name : this.userLinksDB.keySet()) {
798 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
799 ci.println("Name : " + name);
800 ci.println(linkConfig);
801 ci.println("Edge " + getLinkTuple(linkConfig));
802 ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
806 public void _addUserLink(CommandInterpreter ci) {
807 String name = ci.nextArgument();
808 if ((name == null)) {
809 ci.println("Please enter a valid Name");
813 String ncStr1 = ci.nextArgument();
814 if (ncStr1 == null) {
815 ci.println("Please enter two node connector strings");
818 String ncStr2 = ci.nextArgument();
819 if (ncStr2 == null) {
820 ci.println("Please enter second node connector string");
824 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
826 ci.println("Invalid input node connector 1 string: " + ncStr1);
829 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
831 ci.println("Invalid input node connector 2 string: " + ncStr2);
835 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
836 ci.println(this.addUserLink(config));
839 public void _deleteUserLink(CommandInterpreter ci) {
840 String name = ci.nextArgument();
841 if ((name == null)) {
842 ci.println("Please enter a valid Name");
845 this.deleteUserLink(name);
848 public void _printNodeEdges(CommandInterpreter ci) {
849 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
850 if (nodeEdges == null) {
853 Set<Node> nodeSet = nodeEdges.keySet();
854 if (nodeSet == null) {
857 ci.println(" Node Edge");
858 for (Node node : nodeSet) {
859 Set<Edge> edgeSet = nodeEdges.get(node);
860 if (edgeSet == null) {
863 for (Edge edge : edgeSet) {
864 ci.println(node + " " + edge);
870 public Object readObject(ObjectInputStream ois)
871 throws FileNotFoundException, IOException, ClassNotFoundException {
872 return ois.readObject();
876 public Status saveConfiguration() {
881 public void edgeOverUtilized(Edge edge) {
882 log.warn("Link Utilization above normal: {}", edge);
886 public void edgeUtilBackToNormal(Edge edge) {
887 log.warn("Link Utilization back to normal: {}", edge);
890 private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
891 TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
892 upd.setLocal(isLocal);
897 public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
898 if (cacheName.equals(TOPOEDGESDB)) {
899 // This is the case of an Edge being added to the topology DB
900 final Edge e = (Edge) key;
901 log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
902 edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
907 public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
908 if (cacheName.equals(TOPOEDGESDB)) {
909 final Edge e = (Edge) key;
910 log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
911 final Set<Property> props = (Set<Property>) new_value;
912 edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
917 public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
918 if (cacheName.equals(TOPOEDGESDB)) {
919 final Edge e = (Edge) key;
920 log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
921 edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
925 class TopologyNotify implements Runnable {
926 private final BlockingQueue<TopoEdgeUpdate> notifyQ;
927 private TopoEdgeUpdate entry;
928 private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
929 private boolean notifyListeners;
931 TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
932 this.notifyQ = notifyQ;
939 log.trace("New run of TopologyNotify");
940 notifyListeners = false;
941 // First we block waiting for an element to get in
942 entry = notifyQ.take();
943 // Then we drain the whole queue if elements are
944 // in it without getting into any blocking condition
945 for (; entry != null; entry = notifyQ.poll()) {
947 notifyListeners = true;
950 // Notify listeners only if there were updates drained else
952 if (notifyListeners) {
953 log.trace("Notifier thread, notified a listener");
954 // Now update the listeners
955 for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
957 s.edgeUpdate(teuList);
958 } catch (Exception exc) {
959 log.error("Exception on edge update:", exc);
965 // Lets sleep for sometime to allow aggregation of event
967 } catch (InterruptedException e1) {
971 log.warn("TopologyNotify interrupted {}", e1.getMessage());
972 } catch (Exception e2) {