2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.topologymanager.internal;
11 import org.apache.commons.lang3.tuple.ImmutablePair;
12 import org.apache.felix.dm.Component;
13 import org.eclipse.osgi.framework.console.CommandInterpreter;
14 import org.eclipse.osgi.framework.console.CommandProvider;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.configuration.ConfigurationObject;
21 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
22 import org.opendaylight.controller.configuration.IConfigurationContainerService;
23 import org.opendaylight.controller.sal.core.Edge;
24 import org.opendaylight.controller.sal.core.Host;
25 import org.opendaylight.controller.sal.core.Node;
26 import org.opendaylight.controller.sal.core.NodeConnector;
27 import org.opendaylight.controller.sal.core.Property;
28 import org.opendaylight.controller.sal.core.TimeStamp;
29 import org.opendaylight.controller.sal.core.UpdateType;
30 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
31 import org.opendaylight.controller.sal.topology.ITopologyService;
32 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
33 import org.opendaylight.controller.sal.utils.IObjectReader;
34 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
35 import org.opendaylight.controller.sal.utils.Status;
36 import org.opendaylight.controller.sal.utils.StatusCode;
37 import org.opendaylight.controller.switchmanager.ISwitchManager;
38 import org.opendaylight.controller.topologymanager.ITopologyManager;
39 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
40 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
41 import org.opendaylight.controller.topologymanager.ITopologyManagerShell;
42 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
43 import org.osgi.framework.BundleContext;
44 import org.osgi.framework.FrameworkUtil;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import java.io.FileNotFoundException;
49 import java.io.IOException;
50 import java.io.ObjectInputStream;
51 import java.util.ArrayList;
52 import java.util.Dictionary;
53 import java.util.EnumSet;
54 import java.util.HashMap;
55 import java.util.HashSet;
56 import java.util.Iterator;
57 import java.util.LinkedList;
58 import java.util.List;
61 import java.util.concurrent.BlockingQueue;
62 import java.util.concurrent.ConcurrentHashMap;
63 import java.util.concurrent.ConcurrentMap;
64 import java.util.concurrent.CopyOnWriteArraySet;
65 import java.util.concurrent.LinkedBlockingQueue;
68 * The class describes TopologyManager which is the central repository of the
69 * network topology. It provides service for applications to interact with
70 * topology database and notifies all the listeners of topology changes.
72 public class TopologyManagerImpl implements
73 ICacheUpdateAware<Object, Object>,
75 ITopologyManagerShell,
76 IConfigurationContainerAware,
80 protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
81 protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
82 protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
83 protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
84 private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
85 private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
86 private ITopologyService topoService;
87 private IClusterContainerServices clusterContainerService;
88 private IConfigurationContainerService configurationService;
89 private ISwitchManager switchManager;
90 // DB of all the Edges with properties which constitute our topology
91 private ConcurrentMap<Edge, Set<Property>> edgesDB;
92 // DB of all NodeConnector which are part of ISL Edges, meaning they
93 // are connected to another NodeConnector on the other side of an ISL link.
94 // NodeConnector of a Production Edge is not part of this DB.
95 private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
96 // DB of all the NodeConnectors with an Host attached to it
97 private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
98 // Topology Manager Aware listeners
99 private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
100 // Topology Manager Aware listeners - for clusterwide updates
101 private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
102 new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
103 private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
104 private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
105 private volatile Boolean shuttingDown = false;
106 private Thread notifyThread;
109 void nonClusterObjectCreate() {
110 edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
111 hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
112 nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
113 userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
116 void setTopologyManagerAware(ITopologyManagerAware s) {
117 if (this.topologyManagerAware != null) {
118 log.debug("Adding ITopologyManagerAware: {}", s);
119 this.topologyManagerAware.add(s);
123 void unsetTopologyManagerAware(ITopologyManagerAware s) {
124 if (this.topologyManagerAware != null) {
125 log.debug("Removing ITopologyManagerAware: {}", s);
126 this.topologyManagerAware.remove(s);
130 void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
131 if (this.topologyManagerClusterWideAware != null) {
132 log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
133 this.topologyManagerClusterWideAware.add(s);
137 void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
138 if (this.topologyManagerClusterWideAware != null) {
139 log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
140 this.topologyManagerClusterWideAware.remove(s);
144 void setTopoService(ITopologyService s) {
145 log.debug("Adding ITopologyService: {}", s);
146 this.topoService = s;
149 void unsetTopoService(ITopologyService s) {
150 if (this.topoService == s) {
151 log.debug("Removing ITopologyService: {}", s);
152 this.topoService = null;
156 void setClusterContainerService(IClusterContainerServices s) {
157 log.debug("Cluster Service set");
158 this.clusterContainerService = s;
161 void unsetClusterContainerService(IClusterContainerServices s) {
162 if (this.clusterContainerService == s) {
163 log.debug("Cluster Service removed!");
164 this.clusterContainerService = null;
168 public void setConfigurationContainerService(IConfigurationContainerService service) {
169 log.trace("Got configuration service set request {}", service);
170 this.configurationService = service;
173 public void unsetConfigurationContainerService(IConfigurationContainerService service) {
174 log.trace("Got configuration service UNset request");
175 this.configurationService = null;
178 void setSwitchManager(ISwitchManager s) {
179 log.debug("Adding ISwitchManager: {}", s);
180 this.switchManager = s;
183 void unsetSwitchManager(ISwitchManager s) {
184 if (this.switchManager == s) {
185 log.debug("Removing ISwitchManager: {}", s);
186 this.switchManager = null;
191 * Function called by the dependency manager when all the required
192 * dependencies are satisfied
195 void init(Component c) {
198 String containerName = null;
199 Dictionary<?, ?> props = c.getServiceProperties();
201 containerName = (String) props.get("containerName");
203 // In the Global instance case the containerName is empty
204 containerName = "UNKNOWN";
207 registerWithOSGIConsole();
210 // Restore the shuttingDown status on init of the component
211 shuttingDown = false;
212 notifyThread = new Thread(new TopologyNotify(notifyQ));
215 @SuppressWarnings({ "unchecked" })
216 private void allocateCaches() {
218 (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
221 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
223 this.nodeConnectorsDB =
224 (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
225 TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
227 (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
228 TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
231 private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
232 ConcurrentMap<?, ?> cache = null;
234 cache = this.clusterContainerService.createCache(cacheName, cacheModes);
235 } catch (CacheExistException e) {
236 log.debug(cacheName + " cache already exists - destroy and recreate if needed");
237 } catch (CacheConfigException e) {
238 log.error(cacheName + " cache configuration invalid - check cache mode");
243 @SuppressWarnings({ "unchecked" })
244 private void retrieveCaches() {
245 if (this.clusterContainerService == null) {
246 log.error("Cluster Services is null, can't retrieve caches.");
250 this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
251 if (edgesDB == null) {
252 log.error("Failed to get cache for " + TOPOEDGESDB);
256 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
257 if (hostsDB == null) {
258 log.error("Failed to get cache for " + TOPOHOSTSDB);
261 this.nodeConnectorsDB =
262 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
263 if (nodeConnectorsDB == null) {
264 log.error("Failed to get cache for " + TOPONODECONNECTORDB);
268 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
269 if (userLinksDB == null) {
270 log.error("Failed to get cache for " + TOPOUSERLINKSDB);
275 * Function called after the topology manager has registered the service in
276 * OSGi service registry.
280 // Start the batcher thread for the cluster wide topology updates
281 notifyThread.start();
282 // SollicitRefresh MUST be called here else if called at init
283 // time it may sollicit refresh too soon.
284 log.debug("Sollicit topology refresh");
285 topoService.sollicitRefresh();
290 notifyThread.interrupt();
294 * Function called by the dependency manager when at least one dependency
295 * become unsatisfied or when the component is shutting down because for
296 * example bundle is being stopped.
304 private void loadConfiguration() {
305 for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) {
306 addUserLink((TopologyUserLinkConfig) conf);
311 public Status saveConfig() {
312 return saveConfigInternal();
315 public Status saveConfigInternal() {
316 Status saveStatus = configurationService.persistConfiguration(
317 new ArrayList<ConfigurationObject>(userLinksDB.values()), USER_LINKS_FILE_NAME);
319 if (!saveStatus.isSuccess()) {
320 return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
326 public Map<Node, Set<Edge>> getNodeEdges() {
327 if (this.edgesDB == null) {
331 Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
332 for (Edge edge : this.edgesDB.keySet()) {
333 // Lets analyze the tail
334 Node node = edge.getTailNodeConnector().getNode();
335 Set<Edge> nodeEdges = res.get(node);
336 if (nodeEdges == null) {
337 nodeEdges = new HashSet<Edge>();
338 res.put(node, nodeEdges);
342 // Lets analyze the head
343 node = edge.getHeadNodeConnector().getNode();
344 nodeEdges = res.get(node);
345 if (nodeEdges == null) {
346 nodeEdges = new HashSet<Edge>();
347 res.put(node, nodeEdges);
356 public boolean isInternal(NodeConnector p) {
357 if (this.nodeConnectorsDB == null) {
361 // This is an internal NodeConnector if is connected to
362 // another Node i.e it's part of the nodeConnectorsDB
363 return (this.nodeConnectorsDB.get(p) != null);
367 * This method returns true if the edge is an ISL link.
371 * @return true if it is an ISL link
373 public boolean isISLink(Edge e) {
374 return (!isProductionLink(e));
378 * This method returns true if the edge is a production link.
382 * @return true if it is a production link
384 public boolean isProductionLink(Edge e) {
385 return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
386 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
390 * This method cross checks the determination of nodeConnector type by Discovery Service
391 * against the information in SwitchManager and updates it accordingly.
395 private void crossCheckNodeConnectors(Edge e) {
397 if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
398 nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
400 e.setHeadNodeConnector(nc);
403 if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
404 nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
406 e.setTailNodeConnector(nc);
412 * A NodeConnector may have been categorized as of type Production by Discovery Service.
413 * But at the time when this determination was made, only OF nodes were known to Discovery
414 * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
415 * so, then it returns a new NodeConnector with correct type.
418 * NodeConnector as passed on in the edge
420 * If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
421 * with correct type, null otherwise
424 private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
426 for (Node node : switchManager.getNodes()) {
427 String nodeName = node.getNodeIDString();
428 log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
429 nc.getNode().getNodeIDString());
430 if (nodeName.equals(nc.getNode().getNodeIDString())) {
431 NodeConnector nodeConnector = NodeConnectorCreator
432 .createNodeConnector(node.getType(), nc.getID(), node);
433 return nodeConnector;
440 * The Map returned is a copy of the current topology hence if the topology
441 * changes the copy doesn't
443 * @return A Map representing the current topology expressed as edges of the
447 public Map<Edge, Set<Property>> getEdges() {
448 if (this.edgesDB == null) {
452 Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
454 for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
455 // Sets of props are copied because the composition of
456 // those properties could change with time
457 props = new HashSet<Property>(edgeEntry.getValue());
458 // We can simply reuse the key because the object is
459 // immutable so doesn't really matter that we are
460 // referencing the only owned by a different table, the
461 // meaning is the same because doesn't change with time.
462 edgeMap.put(edgeEntry.getKey(), props);
469 public Set<NodeConnector> getNodeConnectorWithHost() {
470 if (this.hostsDB == null) {
474 return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
478 public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
479 if (this.hostsDB == null) {
482 HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
484 Set<NodeConnector> portSet;
485 for (NodeConnector nc : this.hostsDB.keySet()) {
487 portSet = res.get(node);
488 if (portSet == null) {
489 // Create the HashSet if null
490 portSet = new HashSet<NodeConnector>();
491 res.put(node, portSet);
494 // Keep updating the HashSet, given this is not a
495 // clustered map we can just update the set without
496 // worrying to update the hashmap.
504 public Host getHostAttachedToNodeConnector(NodeConnector port) {
505 List<Host> hosts = getHostsAttachedToNodeConnector(port);
506 if(hosts != null && !hosts.isEmpty()){
513 public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
514 Set<ImmutablePair<Host, Set<Property>>> hosts;
515 if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
518 // create a list of hosts
519 List<Host> retHosts = new LinkedList<Host>();
520 for(ImmutablePair<Host, Set<Property>> host : hosts) {
521 retHosts.add(host.getLeft());
527 public synchronized void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
529 // Clone the property set in case non null else just
530 // create an empty one. Caches allocated via infinispan
531 // don't allow null values
533 props = new HashSet<Property>();
535 props = new HashSet<Property>(props);
537 ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
540 Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
541 if(hostSet == null) {
542 hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
547 hostSet.add(thisHost);
548 this.hostsDB.put(port, hostSet);
551 hostSet.remove(thisHost);
552 if(hostSet.isEmpty()) {
553 //remove only if hasn't been concurrently modified
554 this.hostsDB.remove(port, hostSet);
556 this.hostsDB.put(port, hostSet);
562 private boolean headNodeConnectorExist(Edge e) {
564 * Only check the head end point which is supposed to be part of a
565 * network node we control (present in our inventory). If we checked the
566 * tail end point as well, we would not store the edges that connect to
567 * a non sdn enable port on a non sdn capable production switch. We want
568 * to be able to see these switches on the topology.
570 NodeConnector head = e.getHeadNodeConnector();
571 return (switchManager.doesNodeConnectorExist(head));
574 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
579 if (this.edgesDB.containsKey(e)) {
580 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
581 log.trace("Skipping redundant edge addition: {}", e);
585 // Make sure the props are non-null or create a copy
587 props = new HashSet<Property>();
589 props = new HashSet<Property>(props);
593 // Ensure that head node connector exists
594 if (!headNodeConnectorExist(e)) {
595 log.warn("Ignore edge that contains invalid node connector: {}", e);
599 // Check if nodeConnectors of the edge were correctly categorized
600 // by protocol plugin
601 crossCheckNodeConnectors(e);
603 // Now make sure there is the creation timestamp for the
604 // edge, if not there, stamp with the first update
605 boolean found_create = false;
606 for (Property prop : props) {
607 if (prop instanceof TimeStamp) {
608 TimeStamp t = (TimeStamp) prop;
609 if (t.getTimeStampName().equals("creation")) {
617 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
621 // Now add this in the database eventually overriding
622 // something that may have been already existing
623 this.edgesDB.put(e, props);
625 // Now populate the DB of NodeConnectors
626 // NOTE WELL: properties are empty sets, not really needed
628 // The DB only contains ISL ports
630 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
631 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
633 log.trace("Edge {} {}", e.toString(), type.name());
636 // Now remove the edge from edgesDB
637 this.edgesDB.remove(e);
639 // Now lets update the NodeConnectors DB, the assumption
640 // here is that two NodeConnector are exclusively
641 // connected by 1 and only 1 edge, this is reasonable in
642 // the same plug (virtual of phisical) we can assume two
643 // cables won't be plugged. This could break only in case
644 // of devices in the middle that acts as hubs, but it
645 // should be safe to assume that won't happen.
646 this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
647 this.nodeConnectorsDB.remove(e.getTailNodeConnector());
648 log.trace("Edge {} {}", e.toString(), type.name());
651 Set<Property> oldProps = this.edgesDB.get(e);
653 // When property(s) changes lets make sure we can change it
654 // all except the creation time stamp because that should
655 // be set only when the edge is created
656 TimeStamp timeStamp = null;
657 if (oldProps != null) {
658 for (Property prop : oldProps) {
659 if (prop instanceof TimeStamp) {
660 TimeStamp tsProp = (TimeStamp) prop;
661 if (tsProp.getTimeStampName().equals("creation")) {
669 // Now lets make sure new properties are non-null
671 props = new HashSet<Property>();
673 // Copy the set so noone is going to change the content
674 props = new HashSet<Property>(props);
677 // Now lets remove the creation property if exist in the
679 for (Iterator<Property> i = props.iterator(); i.hasNext();) {
680 Property prop = i.next();
681 if (prop instanceof TimeStamp) {
682 TimeStamp t = (TimeStamp) prop;
683 if (t.getTimeStampName().equals("creation")) {
684 if (timeStamp != null) {
692 // Now lets add the creation timestamp in it
693 if (timeStamp != null) {
694 props.add(timeStamp);
698 this.edgesDB.put(e, props);
699 log.trace("Edge {} {}", e.toString(), type.name());
702 return new TopoEdgeUpdate(e, props, type);
706 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
707 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
708 for (int i = 0; i < topoedgeupdateList.size(); i++) {
709 Edge e = topoedgeupdateList.get(i).getEdge();
710 Set<Property> p = topoedgeupdateList.get(i).getProperty();
711 UpdateType type = topoedgeupdateList.get(i).getUpdateType();
712 TopoEdgeUpdate teu = edgeUpdate(e, type, p);
718 if (!teuList.isEmpty()) {
719 // Now update the listeners
720 for (ITopologyManagerAware s : this.topologyManagerAware) {
722 s.edgeUpdate(teuList);
723 } catch (Exception exc) {
724 log.error("Exception on edge update:", exc);
730 private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
731 TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
732 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
733 return getLinkTuple(rLink);
737 private Edge getLinkTuple(TopologyUserLinkConfig link) {
738 NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
739 NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
741 return new Edge(srcNodeConnector, dstNodeConnector);
742 } catch (Exception e) {
748 public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
749 return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
753 public Status addUserLink(TopologyUserLinkConfig userLink) {
754 if (!userLink.isValid()) {
755 return new Status(StatusCode.BADREQUEST,
756 "User link configuration invalid.");
758 userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
760 //Check if this link already configured
761 //NOTE: infinispan cache doesn't support Map.containsValue()
762 // (which is linear time in most ConcurrentMap impl anyway)
763 for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
764 if (existingLink.equals(userLink)) {
765 return new Status(StatusCode.CONFLICT, "Link configuration exists");
768 //attempt put, if mapping for this key already existed return conflict
769 if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
770 return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
771 + " already exists. Please use another name");
774 Edge linkTuple = getLinkTuple(userLink);
775 if (linkTuple != null) {
776 if (!isProductionLink(linkTuple)) {
777 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
778 new HashSet<Property>());
780 userLinksDB.remove(userLink.getName());
781 return new Status(StatusCode.NOTFOUND,
782 "Link configuration contains invalid node connector: "
787 linkTuple = getReverseLinkTuple(userLink);
788 if (linkTuple != null) {
789 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
790 if (!isProductionLink(linkTuple)) {
791 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
795 return new Status(StatusCode.SUCCESS);
799 public Status deleteUserLink(String linkName) {
800 if (linkName == null) {
801 return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
804 TopologyUserLinkConfig link = userLinksDB.remove(linkName);
806 if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
807 if (! isProductionLink(linkTuple)) {
808 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
811 linkTuple = getReverseLinkTuple(link);
812 if (! isProductionLink(linkTuple)) {
813 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
816 return new Status(StatusCode.SUCCESS);
819 private void registerWithOSGIConsole() {
820 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
822 bundleContext.registerService(CommandProvider.class.getName(), this,
827 public String getHelp() {
828 StringBuffer help = new StringBuffer();
829 help.append("---Topology Manager---\n");
830 help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
831 help.append("\t deleteUserLink <name>\n");
832 help.append("\t printUserLink\n");
833 help.append("\t printNodeEdges\n");
834 return help.toString();
837 public void _printUserLink(CommandInterpreter ci) {
838 for (String name : this.userLinksDB.keySet()) {
839 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
840 ci.println("Name : " + name);
841 ci.println(linkConfig);
842 ci.println("Edge " + getLinkTuple(linkConfig));
843 ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
847 public void _addUserLink(CommandInterpreter ci) {
848 String name = ci.nextArgument();
849 if ((name == null)) {
850 ci.println("Please enter a valid Name");
854 String ncStr1 = ci.nextArgument();
855 if (ncStr1 == null) {
856 ci.println("Please enter two node connector strings");
859 String ncStr2 = ci.nextArgument();
860 if (ncStr2 == null) {
861 ci.println("Please enter second node connector string");
865 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
867 ci.println("Invalid input node connector 1 string: " + ncStr1);
870 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
872 ci.println("Invalid input node connector 2 string: " + ncStr2);
876 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
877 ci.println(this.addUserLink(config));
880 public void _deleteUserLink(CommandInterpreter ci) {
881 String name = ci.nextArgument();
882 if ((name == null)) {
883 ci.println("Please enter a valid Name");
886 this.deleteUserLink(name);
889 public void _printNodeEdges(CommandInterpreter ci) {
890 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
891 if (nodeEdges == null) {
894 Set<Node> nodeSet = nodeEdges.keySet();
895 if (nodeSet == null) {
898 ci.println(" Node Edge");
899 for (Node node : nodeSet) {
900 Set<Edge> edgeSet = nodeEdges.get(node);
901 if (edgeSet == null) {
904 for (Edge edge : edgeSet) {
905 ci.println(node + " " + edge);
911 public Object readObject(ObjectInputStream ois)
912 throws FileNotFoundException, IOException, ClassNotFoundException {
913 return ois.readObject();
917 public Status saveConfiguration() {
922 public void edgeOverUtilized(Edge edge) {
923 log.warn("Link Utilization above normal: {}", edge);
927 public void edgeUtilBackToNormal(Edge edge) {
928 log.warn("Link Utilization back to normal: {}", edge);
931 private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
932 TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
933 upd.setLocal(isLocal);
938 public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
939 if (cacheName.equals(TOPOEDGESDB)) {
940 // This is the case of an Edge being added to the topology DB
941 final Edge e = (Edge) key;
942 log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
943 edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
948 public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
949 if (cacheName.equals(TOPOEDGESDB)) {
950 final Edge e = (Edge) key;
951 log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
952 final Set<Property> props = (Set<Property>) new_value;
953 edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
958 public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
959 if (cacheName.equals(TOPOEDGESDB)) {
960 final Edge e = (Edge) key;
961 log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
962 edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
966 class TopologyNotify implements Runnable {
967 private final BlockingQueue<TopoEdgeUpdate> notifyQ;
968 private TopoEdgeUpdate entry;
969 private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
970 private boolean notifyListeners;
972 TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
973 this.notifyQ = notifyQ;
980 log.trace("New run of TopologyNotify");
981 notifyListeners = false;
982 // First we block waiting for an element to get in
983 entry = notifyQ.take();
984 // Then we drain the whole queue if elements are
985 // in it without getting into any blocking condition
986 for (; entry != null; entry = notifyQ.poll()) {
988 notifyListeners = true;
991 // Notify listeners only if there were updates drained else
993 if (notifyListeners) {
994 log.trace("Notifier thread, notified a listener");
995 // Now update the listeners
996 for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
998 s.edgeUpdate(teuList);
999 } catch (Exception exc) {
1000 log.error("Exception on edge update:", exc);
1006 // Lets sleep for sometime to allow aggregation of event
1008 } catch (InterruptedException e1) {
1012 log.warn("TopologyNotify interrupted {}", e1.getMessage());
1013 } catch (Exception e2) {
1020 public List<String> printUserLink() {
1021 List<String> result = new ArrayList<String>();
1022 for (String name : this.userLinksDB.keySet()) {
1023 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
1024 result.add("Name : " + name);
1025 result.add(linkConfig.toString());
1026 result.add("Edge " + getLinkTuple(linkConfig));
1027 result.add("Reverse Edge " + getReverseLinkTuple(linkConfig));
1032 public List<String> addUserLink(String name, String ncStr1, String ncStr2) {
1033 List<String> result = new ArrayList<String>();
1034 if ((name == null)) {
1035 result.add("Please enter a valid Name");
1039 if (ncStr1 == null) {
1040 result.add("Please enter two node connector strings");
1043 if (ncStr2 == null) {
1044 result.add("Please enter second node connector string");
1048 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
1050 result.add("Invalid input node connector 1 string: " + ncStr1);
1053 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
1055 result.add("Invalid input node connector 2 string: " + ncStr2);
1059 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
1060 result.add(this.addUserLink(config).toString());
1064 public List<String> deleteUserLinkShell(String name) {
1065 List<String> result = new ArrayList<String>();
1066 if ((name == null)) {
1067 result.add("Please enter a valid Name");
1070 this.deleteUserLink(name);
1074 public List<String> printNodeEdges() {
1075 List<String> result = new ArrayList<String>();
1076 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
1077 if (nodeEdges == null) {
1080 Set<Node> nodeSet = nodeEdges.keySet();
1081 if (nodeSet == null) {
1084 result.add(" Node Edge");
1085 for (Node node : nodeSet) {
1086 Set<Edge> edgeSet = nodeEdges.get(node);
1087 if (edgeSet == null) {
1090 for (Edge edge : edgeSet) {
1091 result.add(node + " " + edge);