2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.topologymanager.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
23 import java.util.Map.Entry;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.CopyOnWriteArraySet;
29 import java.util.concurrent.LinkedBlockingQueue;
31 import org.apache.commons.lang3.tuple.ImmutablePair;
32 import org.apache.felix.dm.Component;
33 import org.eclipse.osgi.framework.console.CommandInterpreter;
34 import org.eclipse.osgi.framework.console.CommandProvider;
35 import org.opendaylight.controller.clustering.services.CacheConfigException;
36 import org.opendaylight.controller.clustering.services.CacheExistException;
37 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
38 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
39 import org.opendaylight.controller.clustering.services.IClusterServices;
40 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
41 import org.opendaylight.controller.sal.core.Edge;
42 import org.opendaylight.controller.sal.core.Host;
43 import org.opendaylight.controller.sal.core.Node;
44 import org.opendaylight.controller.sal.core.NodeConnector;
45 import org.opendaylight.controller.sal.core.Property;
46 import org.opendaylight.controller.sal.core.TimeStamp;
47 import org.opendaylight.controller.sal.core.UpdateType;
48 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
49 import org.opendaylight.controller.sal.topology.ITopologyService;
50 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
51 import org.opendaylight.controller.sal.utils.GlobalConstants;
52 import org.opendaylight.controller.sal.utils.IObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectReader;
54 import org.opendaylight.controller.sal.utils.ObjectWriter;
55 import org.opendaylight.controller.sal.utils.Status;
56 import org.opendaylight.controller.sal.utils.StatusCode;
57 import org.opendaylight.controller.switchmanager.ISwitchManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManager;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
60 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
61 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
62 import org.osgi.framework.BundleContext;
63 import org.osgi.framework.FrameworkUtil;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * The class describes TopologyManager which is the central repository of the
69 * network topology. It provides service for applications to interact with
70 * topology database and notifies all the listeners of topology changes.
72 public class TopologyManagerImpl implements
75 IConfigurationContainerAware,
79 static final String TOPOEDGESDB = "topologymanager.edgesDB";
80 static final String TOPOHOSTSDB = "topologymanager.hostsDB";
81 static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
82 static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
83 private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
84 private ITopologyService topoService;
85 private IClusterContainerServices clusterContainerService;
86 private ISwitchManager switchManager;
87 // DB of all the Edges with properties which constitute our topology
88 private ConcurrentMap<Edge, Set<Property>> edgesDB;
89 // DB of all NodeConnector which are part of ISL Edges, meaning they
90 // are connected to another NodeConnector on the other side of an ISL link.
91 // NodeConnector of a Production Edge is not part of this DB.
92 private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
93 // DB of all the NodeConnectors with an Host attached to it
94 private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
95 // Topology Manager Aware listeners
96 private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
97 // Topology Manager Aware listeners - for clusterwide updates
98 private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
99 new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
100 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
101 private String userLinksFileName;
102 private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
103 private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
104 private volatile Boolean shuttingDown = false;
105 private Thread notifyThread;
108 void nonClusterObjectCreate() {
109 edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
110 hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
111 nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
112 userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
115 void setTopologyManagerAware(ITopologyManagerAware s) {
116 if (this.topologyManagerAware != null) {
117 log.debug("Adding ITopologyManagerAware: {}", s);
118 this.topologyManagerAware.add(s);
119 // Reply all the known edges
120 if (this.edgesDB != null) {
121 List<TopoEdgeUpdate> existingEdges = new ArrayList<TopoEdgeUpdate>();
122 for (Entry<Edge, Set<Property>> entry : this.edgesDB.entrySet()) {
123 existingEdges.add(new TopoEdgeUpdate(entry.getKey(), entry.getValue(), UpdateType.ADDED));
125 s.edgeUpdate(existingEdges);
130 void unsetTopologyManagerAware(ITopologyManagerAware s) {
131 if (this.topologyManagerAware != null) {
132 log.debug("Removing ITopologyManagerAware: {}", s);
133 this.topologyManagerAware.remove(s);
137 void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
138 if (this.topologyManagerClusterWideAware != null) {
139 log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
140 this.topologyManagerClusterWideAware.add(s);
141 // Reply all the known edges
142 if (this.edgesDB != null) {
143 List<TopoEdgeUpdate> existingEdges = new ArrayList<TopoEdgeUpdate>();
144 for (Entry<Edge, Set<Property>> entry : this.edgesDB.entrySet()) {
145 existingEdges.add(new TopoEdgeUpdate(entry.getKey(), entry.getValue(), UpdateType.ADDED));
147 s.edgeUpdate(existingEdges);
152 void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
153 if (this.topologyManagerClusterWideAware != null) {
154 log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
155 this.topologyManagerClusterWideAware.remove(s);
159 void setTopoService(ITopologyService s) {
160 log.debug("Adding ITopologyService: {}", s);
161 this.topoService = s;
164 void unsetTopoService(ITopologyService s) {
165 if (this.topoService == s) {
166 log.debug("Removing ITopologyService: {}", s);
167 this.topoService = null;
171 void setClusterContainerService(IClusterContainerServices s) {
172 log.debug("Cluster Service set");
173 this.clusterContainerService = s;
176 void unsetClusterContainerService(IClusterContainerServices s) {
177 if (this.clusterContainerService == s) {
178 log.debug("Cluster Service removed!");
179 this.clusterContainerService = null;
183 void setSwitchManager(ISwitchManager s) {
184 log.debug("Adding ISwitchManager: {}", s);
185 this.switchManager = s;
188 void unsetSwitchManager(ISwitchManager s) {
189 if (this.switchManager == s) {
190 log.debug("Removing ISwitchManager: {}", s);
191 this.switchManager = null;
196 * Function called by the dependency manager when all the required
197 * dependencies are satisfied
200 void init(Component c) {
203 String containerName = null;
204 Dictionary<?, ?> props = c.getServiceProperties();
206 containerName = (String) props.get("containerName");
208 // In the Global instance case the containerName is empty
209 containerName = "UNKNOWN";
212 userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
213 registerWithOSGIConsole();
215 // Restore the shuttingDown status on init of the component
216 shuttingDown = false;
217 notifyThread = new Thread(new TopologyNotify(notifyQ));
220 @SuppressWarnings({ "unchecked", "deprecation" })
221 private void allocateCaches() {
224 (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
225 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
226 } catch (CacheExistException cee) {
227 log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
228 } catch (CacheConfigException cce) {
229 log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
234 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
235 TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
236 } catch (CacheExistException cee) {
237 log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
238 } catch (CacheConfigException cce) {
239 log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
243 this.nodeConnectorsDB =
244 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
245 TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
246 } catch (CacheExistException cee) {
247 log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
248 } catch (CacheConfigException cce) {
249 log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
254 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
255 TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
256 } catch (CacheExistException cee) {
257 log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
258 } catch (CacheConfigException cce) {
259 log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
263 @SuppressWarnings({ "unchecked", "deprecation" })
264 private void retrieveCaches() {
265 if (this.clusterContainerService == null) {
266 log.error("Cluster Services is null, can't retrieve caches.");
270 this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
271 if (edgesDB == null) {
272 log.error("Failed to get cache for " + TOPOEDGESDB);
276 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
277 if (hostsDB == null) {
278 log.error("Failed to get cache for " + TOPOHOSTSDB);
281 this.nodeConnectorsDB =
282 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
283 if (nodeConnectorsDB == null) {
284 log.error("Failed to get cache for " + TOPONODECONNECTORDB);
288 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
289 if (userLinksDB == null) {
290 log.error("Failed to get cache for " + TOPOUSERLINKSDB);
295 * Function called after the topology manager has registered the service in
296 * OSGi service registry.
300 // Start the batcher thread for the cluster wide topology updates
301 notifyThread.start();
302 // SollicitRefresh MUST be called here else if called at init
303 // time it may sollicit refresh too soon.
304 log.debug("Sollicit topology refresh");
305 topoService.sollicitRefresh();
310 notifyThread.interrupt();
314 * Function called by the dependency manager when at least one dependency
315 * become unsatisfied or when the component is shutting down because for
316 * example bundle is being stopped.
324 @SuppressWarnings("unchecked")
325 private void loadConfiguration() {
326 ObjectReader objReader = new ObjectReader();
327 ConcurrentMap<String, TopologyUserLinkConfig> confList =
328 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
330 if (confList != null) {
331 for (TopologyUserLinkConfig conf : confList.values()) {
338 public Status saveConfig() {
339 return saveConfigInternal();
342 public Status saveConfigInternal() {
343 ObjectWriter objWriter = new ObjectWriter();
345 Status saveStatus = objWriter.write(
346 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
348 if (! saveStatus.isSuccess()) {
349 return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
355 public Map<Node, Set<Edge>> getNodeEdges() {
356 if (this.edgesDB == null) {
360 Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
361 for (Edge edge : this.edgesDB.keySet()) {
362 // Lets analyze the tail
363 Node node = edge.getTailNodeConnector().getNode();
364 Set<Edge> nodeEdges = res.get(node);
365 if (nodeEdges == null) {
366 nodeEdges = new HashSet<Edge>();
367 res.put(node, nodeEdges);
371 // Lets analyze the head
372 node = edge.getHeadNodeConnector().getNode();
373 nodeEdges = res.get(node);
374 if (nodeEdges == null) {
375 nodeEdges = new HashSet<Edge>();
376 res.put(node, nodeEdges);
385 public boolean isInternal(NodeConnector p) {
386 if (this.nodeConnectorsDB == null) {
390 // This is an internal NodeConnector if is connected to
391 // another Node i.e it's part of the nodeConnectorsDB
392 return (this.nodeConnectorsDB.get(p) != null);
396 * This method returns true if the edge is an ISL link.
400 * @return true if it is an ISL link
402 public boolean isISLink(Edge e) {
403 return (!isProductionLink(e));
407 * This method returns true if the edge is a production link.
411 * @return true if it is a production link
413 public boolean isProductionLink(Edge e) {
414 return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
415 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
419 * The Map returned is a copy of the current topology hence if the topology
420 * changes the copy doesn't
422 * @return A Map representing the current topology expressed as edges of the
426 public Map<Edge, Set<Property>> getEdges() {
427 if (this.edgesDB == null) {
431 Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
433 for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
434 // Sets of props are copied because the composition of
435 // those properties could change with time
436 props = new HashSet<Property>(edgeEntry.getValue());
437 // We can simply reuse the key because the object is
438 // immutable so doesn't really matter that we are
439 // referencing the only owned by a different table, the
440 // meaning is the same because doesn't change with time.
441 edgeMap.put(edgeEntry.getKey(), props);
448 public Set<NodeConnector> getNodeConnectorWithHost() {
449 if (this.hostsDB == null) {
453 return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
457 public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
458 if (this.hostsDB == null) {
461 HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
463 Set<NodeConnector> portSet;
464 for (NodeConnector nc : this.hostsDB.keySet()) {
466 portSet = res.get(node);
467 if (portSet == null) {
468 // Create the HashSet if null
469 portSet = new HashSet<NodeConnector>();
470 res.put(node, portSet);
473 // Keep updating the HashSet, given this is not a
474 // clustered map we can just update the set without
475 // worrying to update the hashmap.
483 public Host getHostAttachedToNodeConnector(NodeConnector port) {
484 List<Host> hosts = getHostsAttachedToNodeConnector(port);
485 if(hosts != null && !hosts.isEmpty()){
492 public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
493 Set<ImmutablePair<Host, Set<Property>>> hosts;
494 if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
497 // create a list of hosts
498 List<Host> retHosts = new LinkedList<Host>();
499 for(ImmutablePair<Host, Set<Property>> host : hosts) {
500 retHosts.add(host.getLeft());
506 public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
508 // Clone the property set in case non null else just
509 // create an empty one. Caches allocated via infinispan
510 // don't allow null values
512 props = new HashSet<Property>();
514 props = new HashSet<Property>(props);
516 ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
519 Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
520 if(hostSet == null) {
521 hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
526 hostSet.add(thisHost);
527 this.hostsDB.put(port, hostSet);
530 hostSet.remove(thisHost);
531 if(hostSet.isEmpty()) {
532 //remove only if hasn't been concurrently modified
533 this.hostsDB.remove(port, hostSet);
535 this.hostsDB.put(port, hostSet);
541 private boolean nodeConnectorsExist(Edge e) {
542 NodeConnector head = e.getHeadNodeConnector();
543 NodeConnector tail = e.getTailNodeConnector();
544 return (switchManager.doesNodeConnectorExist(head) &&
545 switchManager.doesNodeConnectorExist(tail));
548 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
551 // Ensure that both tail and head node connectors exist.
552 if (!nodeConnectorsExist(e)) {
553 log.warn("Ignore edge that contains invalid node connector: {}", e);
557 // Make sure the props are non-null
559 props = new HashSet<Property>();
561 props = new HashSet<Property>(props);
564 //in case of node switch-over to a different cluster controller,
565 //let's retain edge props
566 Set<Property> currentProps = this.edgesDB.get(e);
567 if (currentProps != null){
568 props.addAll(currentProps);
571 // Now make sure there is the creation timestamp for the
572 // edge, if not there, stamp with the first update
573 boolean found_create = false;
574 for (Property prop : props) {
575 if (prop instanceof TimeStamp) {
576 TimeStamp t = (TimeStamp) prop;
577 if (t.getTimeStampName().equals("creation")) {
585 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
589 // Now add this in the database eventually overriding
590 // something that may have been already existing
591 this.edgesDB.put(e, props);
593 // Now populate the DB of NodeConnectors
594 // NOTE WELL: properties are empty sets, not really needed
596 // The DB only contains ISL ports
598 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
599 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
601 log.trace("Edge {} {}", e.toString(), type.name());
604 // Now remove the edge from edgesDB
605 this.edgesDB.remove(e);
607 // Now lets update the NodeConnectors DB, the assumption
608 // here is that two NodeConnector are exclusively
609 // connected by 1 and only 1 edge, this is reasonable in
610 // the same plug (virtual of phisical) we can assume two
611 // cables won't be plugged. This could break only in case
612 // of devices in the middle that acts as hubs, but it
613 // should be safe to assume that won't happen.
614 this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
615 this.nodeConnectorsDB.remove(e.getTailNodeConnector());
616 log.trace("Edge {} {}", e.toString(), type.name());
619 Set<Property> oldProps = this.edgesDB.get(e);
621 // When property changes lets make sure we can change it
622 // all except the creation time stamp because that should
623 // be changed only when the edge is destroyed and created
625 TimeStamp timeStamp = null;
626 for (Property prop : oldProps) {
627 if (prop instanceof TimeStamp) {
628 TimeStamp tsProp = (TimeStamp) prop;
629 if (tsProp.getTimeStampName().equals("creation")) {
636 // Now lets make sure new properties are non-null
638 props = new HashSet<Property>();
640 // Copy the set so noone is going to change the content
641 props = new HashSet<Property>(props);
644 // Now lets remove the creation property if exist in the
646 for (Iterator<Property> i = props.iterator(); i.hasNext();) {
647 Property prop = i.next();
648 if (prop instanceof TimeStamp) {
649 TimeStamp t = (TimeStamp) prop;
650 if (t.getTimeStampName().equals("creation")) {
657 // Now lets add the creation timestamp in it
658 if (timeStamp != null) {
659 props.add(timeStamp);
663 this.edgesDB.put(e, props);
664 log.trace("Edge {} {}", e.toString(), type.name());
667 return new TopoEdgeUpdate(e, props, type);
671 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
672 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
673 for (int i = 0; i < topoedgeupdateList.size(); i++) {
674 Edge e = topoedgeupdateList.get(i).getEdge();
675 Set<Property> p = topoedgeupdateList.get(i).getProperty();
676 UpdateType type = topoedgeupdateList.get(i).getUpdateType();
677 TopoEdgeUpdate teu = edgeUpdate(e, type, p);
683 if (!teuList.isEmpty()) {
684 // Now update the listeners
685 for (ITopologyManagerAware s : this.topologyManagerAware) {
687 s.edgeUpdate(teuList);
688 } catch (Exception exc) {
689 log.error("Exception on edge update:", exc);
695 private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
696 TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
697 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
698 return getLinkTuple(rLink);
702 private Edge getLinkTuple(TopologyUserLinkConfig link) {
703 NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
704 NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
706 return new Edge(srcNodeConnector, dstNodeConnector);
707 } catch (Exception e) {
713 public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
714 return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
718 public Status addUserLink(TopologyUserLinkConfig userLink) {
719 if (!userLink.isValid()) {
720 return new Status(StatusCode.BADREQUEST,
721 "User link configuration invalid.");
723 userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
725 //Check if this link already configured
726 //NOTE: infinispan cache doesn't support Map.containsValue()
727 // (which is linear time in most ConcurrentMap impl anyway)
728 for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
729 if (existingLink.equals(userLink)) {
730 return new Status(StatusCode.CONFLICT, "Link configuration exists");
733 //attempt put, if mapping for this key already existed return conflict
734 if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
735 return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
736 + " already exists. Please use another name");
739 Edge linkTuple = getLinkTuple(userLink);
740 if (linkTuple != null) {
741 if (!isProductionLink(linkTuple)) {
742 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
743 new HashSet<Property>());
745 userLinksDB.remove(userLink.getName());
746 return new Status(StatusCode.NOTFOUND,
747 "Link configuration contains invalid node connector: "
752 linkTuple = getReverseLinkTuple(userLink);
753 if (linkTuple != null) {
754 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
755 if (!isProductionLink(linkTuple)) {
756 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
760 return new Status(StatusCode.SUCCESS);
764 public Status deleteUserLink(String linkName) {
765 if (linkName == null) {
766 return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
769 TopologyUserLinkConfig link = userLinksDB.remove(linkName);
771 if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
772 if (! isProductionLink(linkTuple)) {
773 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
776 linkTuple = getReverseLinkTuple(link);
777 if (! isProductionLink(linkTuple)) {
778 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
781 return new Status(StatusCode.SUCCESS);
784 private void registerWithOSGIConsole() {
785 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
787 bundleContext.registerService(CommandProvider.class.getName(), this,
792 public String getHelp() {
793 StringBuffer help = new StringBuffer();
794 help.append("---Topology Manager---\n");
795 help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
796 help.append("\t deleteUserLink <name>\n");
797 help.append("\t printUserLink\n");
798 help.append("\t printNodeEdges\n");
799 return help.toString();
802 public void _printUserLink(CommandInterpreter ci) {
803 for (String name : this.userLinksDB.keySet()) {
804 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
805 ci.println("Name : " + name);
806 ci.println(linkConfig);
807 ci.println("Edge " + getLinkTuple(linkConfig));
808 ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
812 public void _addUserLink(CommandInterpreter ci) {
813 String name = ci.nextArgument();
814 if ((name == null)) {
815 ci.println("Please enter a valid Name");
819 String ncStr1 = ci.nextArgument();
820 if (ncStr1 == null) {
821 ci.println("Please enter two node connector strings");
824 String ncStr2 = ci.nextArgument();
825 if (ncStr2 == null) {
826 ci.println("Please enter second node connector string");
830 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
832 ci.println("Invalid input node connector 1 string: " + ncStr1);
835 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
837 ci.println("Invalid input node connector 2 string: " + ncStr2);
841 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
842 ci.println(this.addUserLink(config));
845 public void _deleteUserLink(CommandInterpreter ci) {
846 String name = ci.nextArgument();
847 if ((name == null)) {
848 ci.println("Please enter a valid Name");
851 this.deleteUserLink(name);
854 public void _printNodeEdges(CommandInterpreter ci) {
855 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
856 if (nodeEdges == null) {
859 Set<Node> nodeSet = nodeEdges.keySet();
860 if (nodeSet == null) {
863 ci.println(" Node Edge");
864 for (Node node : nodeSet) {
865 Set<Edge> edgeSet = nodeEdges.get(node);
866 if (edgeSet == null) {
869 for (Edge edge : edgeSet) {
870 ci.println(node + " " + edge);
876 public Object readObject(ObjectInputStream ois)
877 throws FileNotFoundException, IOException, ClassNotFoundException {
878 return ois.readObject();
882 public Status saveConfiguration() {
887 public void edgeOverUtilized(Edge edge) {
888 log.warn("Link Utilization above normal: {}", edge);
892 public void edgeUtilBackToNormal(Edge edge) {
893 log.warn("Link Utilization back to normal: {}", edge);
896 private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
897 TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
898 upd.setLocal(isLocal);
903 public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
904 if (cacheName.equals(TOPOEDGESDB)) {
905 // This is the case of an Edge being added to the topology DB
906 final Edge e = (Edge) key;
907 log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
908 edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
913 public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
914 if (cacheName.equals(TOPOEDGESDB)) {
915 final Edge e = (Edge) key;
916 log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
917 final Set<Property> props = (Set<Property>) new_value;
918 edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
923 public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
924 if (cacheName.equals(TOPOEDGESDB)) {
925 final Edge e = (Edge) key;
926 log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
927 edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
931 class TopologyNotify implements Runnable {
932 private final BlockingQueue<TopoEdgeUpdate> notifyQ;
933 private TopoEdgeUpdate entry;
934 private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
935 private boolean notifyListeners;
937 TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
938 this.notifyQ = notifyQ;
945 log.trace("New run of TopologyNotify");
946 notifyListeners = false;
947 // First we block waiting for an element to get in
948 entry = notifyQ.take();
949 // Then we drain the whole queue if elements are
950 // in it without getting into any blocking condition
951 for (; entry != null; entry = notifyQ.poll()) {
953 notifyListeners = true;
956 // Notify listeners only if there were updates drained else
958 if (notifyListeners) {
959 log.trace("Notifier thread, notified a listener");
960 // Now update the listeners
961 for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
963 s.edgeUpdate(teuList);
964 } catch (Exception exc) {
965 log.error("Exception on edge update:", exc);
971 // Lets sleep for sometime to allow aggregation of event
973 } catch (InterruptedException e1) {
977 log.warn("TopologyNotify interrupted {}", e1.getMessage());
978 } catch (Exception e2) {