2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.topologymanager.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
53 import org.opendaylight.controller.sal.utils.ObjectReader;
54 import org.opendaylight.controller.sal.utils.ObjectWriter;
55 import org.opendaylight.controller.sal.utils.Status;
56 import org.opendaylight.controller.sal.utils.StatusCode;
57 import org.opendaylight.controller.switchmanager.ISwitchManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManager;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
60 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
61 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
62 import org.osgi.framework.BundleContext;
63 import org.osgi.framework.FrameworkUtil;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * The class describes TopologyManager which is the central repository of the
69 * network topology. It provides service for applications to interact with
70 * topology database and notifies all the listeners of topology changes.
72 public class TopologyManagerImpl implements
73 ICacheUpdateAware<Object, Object>,
75 IConfigurationContainerAware,
79 protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
80 protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
81 protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
82 protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
83 private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
84 private ITopologyService topoService;
85 private IClusterContainerServices clusterContainerService;
86 private ISwitchManager switchManager;
87 // DB of all the Edges with properties which constitute our topology
88 private ConcurrentMap<Edge, Set<Property>> edgesDB;
89 // DB of all NodeConnector which are part of ISL Edges, meaning they
90 // are connected to another NodeConnector on the other side of an ISL link.
91 // NodeConnector of a Production Edge is not part of this DB.
92 private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
93 // DB of all the NodeConnectors with an Host attached to it
94 private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
95 // Topology Manager Aware listeners
96 private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
97 // Topology Manager Aware listeners - for clusterwide updates
98 private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
99 new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
100 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
101 private String userLinksFileName;
102 private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
103 private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
104 private volatile Boolean shuttingDown = false;
105 private Thread notifyThread;
108 void nonClusterObjectCreate() {
109 edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
110 hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
111 nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
112 userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
115 void setTopologyManagerAware(ITopologyManagerAware s) {
116 if (this.topologyManagerAware != null) {
117 log.debug("Adding ITopologyManagerAware: {}", s);
118 this.topologyManagerAware.add(s);
122 void unsetTopologyManagerAware(ITopologyManagerAware s) {
123 if (this.topologyManagerAware != null) {
124 log.debug("Removing ITopologyManagerAware: {}", s);
125 this.topologyManagerAware.remove(s);
129 void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
130 if (this.topologyManagerClusterWideAware != null) {
131 log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
132 this.topologyManagerClusterWideAware.add(s);
136 void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
137 if (this.topologyManagerClusterWideAware != null) {
138 log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
139 this.topologyManagerClusterWideAware.remove(s);
143 void setTopoService(ITopologyService s) {
144 log.debug("Adding ITopologyService: {}", s);
145 this.topoService = s;
148 void unsetTopoService(ITopologyService s) {
149 if (this.topoService == s) {
150 log.debug("Removing ITopologyService: {}", s);
151 this.topoService = null;
155 void setClusterContainerService(IClusterContainerServices s) {
156 log.debug("Cluster Service set");
157 this.clusterContainerService = s;
160 void unsetClusterContainerService(IClusterContainerServices s) {
161 if (this.clusterContainerService == s) {
162 log.debug("Cluster Service removed!");
163 this.clusterContainerService = null;
167 void setSwitchManager(ISwitchManager s) {
168 log.debug("Adding ISwitchManager: {}", s);
169 this.switchManager = s;
172 void unsetSwitchManager(ISwitchManager s) {
173 if (this.switchManager == s) {
174 log.debug("Removing ISwitchManager: {}", s);
175 this.switchManager = null;
180 * Function called by the dependency manager when all the required
181 * dependencies are satisfied
184 void init(Component c) {
187 String containerName = null;
188 Dictionary<?, ?> props = c.getServiceProperties();
190 containerName = (String) props.get("containerName");
192 // In the Global instance case the containerName is empty
193 containerName = "UNKNOWN";
196 userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
197 registerWithOSGIConsole();
199 // Restore the shuttingDown status on init of the component
200 shuttingDown = false;
201 notifyThread = new Thread(new TopologyNotify(notifyQ));
204 @SuppressWarnings({ "unchecked" })
205 private void allocateCaches() {
207 (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
210 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
212 this.nodeConnectorsDB =
213 (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
214 TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
216 (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
217 TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
220 private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
221 ConcurrentMap<?, ?> cache = null;
223 cache = this.clusterContainerService.createCache(cacheName, cacheModes);
224 } catch (CacheExistException e) {
225 log.debug(cacheName + " cache already exists - destroy and recreate if needed");
226 } catch (CacheConfigException e) {
227 log.error(cacheName + " cache configuration invalid - check cache mode");
232 @SuppressWarnings({ "unchecked" })
233 private void retrieveCaches() {
234 if (this.clusterContainerService == null) {
235 log.error("Cluster Services is null, can't retrieve caches.");
239 this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
240 if (edgesDB == null) {
241 log.error("Failed to get cache for " + TOPOEDGESDB);
245 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
246 if (hostsDB == null) {
247 log.error("Failed to get cache for " + TOPOHOSTSDB);
250 this.nodeConnectorsDB =
251 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
252 if (nodeConnectorsDB == null) {
253 log.error("Failed to get cache for " + TOPONODECONNECTORDB);
257 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
258 if (userLinksDB == null) {
259 log.error("Failed to get cache for " + TOPOUSERLINKSDB);
264 * Function called after the topology manager has registered the service in
265 * OSGi service registry.
269 // Start the batcher thread for the cluster wide topology updates
270 notifyThread.start();
271 // SollicitRefresh MUST be called here else if called at init
272 // time it may sollicit refresh too soon.
273 log.debug("Sollicit topology refresh");
274 topoService.sollicitRefresh();
279 notifyThread.interrupt();
283 * Function called by the dependency manager when at least one dependency
284 * become unsatisfied or when the component is shutting down because for
285 * example bundle is being stopped.
293 @SuppressWarnings("unchecked")
294 private void loadConfiguration() {
295 ObjectReader objReader = new ObjectReader();
296 ConcurrentMap<String, TopologyUserLinkConfig> confList =
297 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
299 if (confList != null) {
300 for (TopologyUserLinkConfig conf : confList.values()) {
307 public Status saveConfig() {
308 return saveConfigInternal();
311 public Status saveConfigInternal() {
312 ObjectWriter objWriter = new ObjectWriter();
314 Status saveStatus = objWriter.write(
315 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
317 if (! saveStatus.isSuccess()) {
318 return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
324 public Map<Node, Set<Edge>> getNodeEdges() {
325 if (this.edgesDB == null) {
329 Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
330 for (Edge edge : this.edgesDB.keySet()) {
331 // Lets analyze the tail
332 Node node = edge.getTailNodeConnector().getNode();
333 Set<Edge> nodeEdges = res.get(node);
334 if (nodeEdges == null) {
335 nodeEdges = new HashSet<Edge>();
336 res.put(node, nodeEdges);
340 // Lets analyze the head
341 node = edge.getHeadNodeConnector().getNode();
342 nodeEdges = res.get(node);
343 if (nodeEdges == null) {
344 nodeEdges = new HashSet<Edge>();
345 res.put(node, nodeEdges);
354 public boolean isInternal(NodeConnector p) {
355 if (this.nodeConnectorsDB == null) {
359 // This is an internal NodeConnector if is connected to
360 // another Node i.e it's part of the nodeConnectorsDB
361 return (this.nodeConnectorsDB.get(p) != null);
365 * This method returns true if the edge is an ISL link.
369 * @return true if it is an ISL link
371 public boolean isISLink(Edge e) {
372 return (!isProductionLink(e));
376 * This method returns true if the edge is a production link.
380 * @return true if it is a production link
382 public boolean isProductionLink(Edge e) {
383 return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
384 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
388 * This method cross checks the determination of nodeConnector type by Discovery Service
389 * against the information in SwitchManager and updates it accordingly.
393 private void crossCheckNodeConnectors(Edge e) {
395 if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
396 nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
398 e.setHeadNodeConnector(nc);
401 if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
402 nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
404 e.setTailNodeConnector(nc);
410 * A NodeConnector may have been categorized as of type Production by Discovery Service.
411 * But at the time when this determination was made, only OF nodes were known to Discovery
412 * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
413 * so, then it returns a new NodeConnector with correct type.
416 * NodeConnector as passed on in the edge
418 * If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
419 * with correct type, null otherwise
422 private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
424 for (Node node : switchManager.getNodes()) {
425 String nodeName = node.getNodeIDString();
426 log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
427 nc.getNode().getNodeIDString());
428 if (nodeName.equals(nc.getNode().getNodeIDString())) {
429 NodeConnector nodeConnector = NodeConnectorCreator
430 .createNodeConnector(node.getType(), nc.getID(), node);
431 return nodeConnector;
438 * The Map returned is a copy of the current topology hence if the topology
439 * changes the copy doesn't
441 * @return A Map representing the current topology expressed as edges of the
445 public Map<Edge, Set<Property>> getEdges() {
446 if (this.edgesDB == null) {
450 Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
452 for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
453 // Sets of props are copied because the composition of
454 // those properties could change with time
455 props = new HashSet<Property>(edgeEntry.getValue());
456 // We can simply reuse the key because the object is
457 // immutable so doesn't really matter that we are
458 // referencing the only owned by a different table, the
459 // meaning is the same because doesn't change with time.
460 edgeMap.put(edgeEntry.getKey(), props);
467 public Set<NodeConnector> getNodeConnectorWithHost() {
468 if (this.hostsDB == null) {
472 return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
476 public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
477 if (this.hostsDB == null) {
480 HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
482 Set<NodeConnector> portSet;
483 for (NodeConnector nc : this.hostsDB.keySet()) {
485 portSet = res.get(node);
486 if (portSet == null) {
487 // Create the HashSet if null
488 portSet = new HashSet<NodeConnector>();
489 res.put(node, portSet);
492 // Keep updating the HashSet, given this is not a
493 // clustered map we can just update the set without
494 // worrying to update the hashmap.
502 public Host getHostAttachedToNodeConnector(NodeConnector port) {
503 List<Host> hosts = getHostsAttachedToNodeConnector(port);
504 if(hosts != null && !hosts.isEmpty()){
511 public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
512 Set<ImmutablePair<Host, Set<Property>>> hosts;
513 if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
516 // create a list of hosts
517 List<Host> retHosts = new LinkedList<Host>();
518 for(ImmutablePair<Host, Set<Property>> host : hosts) {
519 retHosts.add(host.getLeft());
525 public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
527 // Clone the property set in case non null else just
528 // create an empty one. Caches allocated via infinispan
529 // don't allow null values
531 props = new HashSet<Property>();
533 props = new HashSet<Property>(props);
535 ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
538 Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
539 if(hostSet == null) {
540 hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
545 hostSet.add(thisHost);
546 this.hostsDB.put(port, hostSet);
549 hostSet.remove(thisHost);
550 if(hostSet.isEmpty()) {
551 //remove only if hasn't been concurrently modified
552 this.hostsDB.remove(port, hostSet);
554 this.hostsDB.put(port, hostSet);
560 private boolean headNodeConnectorExist(Edge e) {
562 * Only check the head end point which is supposed to be part of a
563 * network node we control (present in our inventory). If we checked the
564 * tail end point as well, we would not store the edges that connect to
565 * a non sdn enable port on a non sdn capable production switch. We want
566 * to be able to see these switches on the topology.
568 NodeConnector head = e.getHeadNodeConnector();
569 return (switchManager.doesNodeConnectorExist(head));
572 private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
575 // Avoid redundant update as notifications trigger expensive tasks
576 if (edgesDB.containsKey(e)) {
577 log.trace("Skipping redundant edge addition: {}", e);
581 // Ensure that head node connector exists
582 if (!headNodeConnectorExist(e)) {
583 log.warn("Ignore edge that contains invalid node connector: {}", e);
587 // Check if nodeConnectors of the edge were correctly categorized
589 crossCheckNodeConnectors(e);
591 // Make sure the props are non-null
593 props = new HashSet<Property>();
595 props = new HashSet<Property>(props);
598 //in case of node switch-over to a different cluster controller,
599 //let's retain edge props
600 Set<Property> currentProps = this.edgesDB.get(e);
601 if (currentProps != null){
602 props.addAll(currentProps);
605 // Now make sure there is the creation timestamp for the
606 // edge, if not there, stamp with the first update
607 boolean found_create = false;
608 for (Property prop : props) {
609 if (prop instanceof TimeStamp) {
610 TimeStamp t = (TimeStamp) prop;
611 if (t.getTimeStampName().equals("creation")) {
619 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
623 // Now add this in the database eventually overriding
624 // something that may have been already existing
625 this.edgesDB.put(e, props);
627 // Now populate the DB of NodeConnectors
628 // NOTE WELL: properties are empty sets, not really needed
630 // The DB only contains ISL ports
632 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
633 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
635 log.trace("Edge {} {}", e.toString(), type.name());
638 // Now remove the edge from edgesDB
639 this.edgesDB.remove(e);
641 // Now lets update the NodeConnectors DB, the assumption
642 // here is that two NodeConnector are exclusively
643 // connected by 1 and only 1 edge, this is reasonable in
644 // the same plug (virtual of phisical) we can assume two
645 // cables won't be plugged. This could break only in case
646 // of devices in the middle that acts as hubs, but it
647 // should be safe to assume that won't happen.
648 this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
649 this.nodeConnectorsDB.remove(e.getTailNodeConnector());
650 log.trace("Edge {} {}", e.toString(), type.name());
653 Set<Property> oldProps = this.edgesDB.get(e);
655 // When property changes lets make sure we can change it
656 // all except the creation time stamp because that should
657 // be changed only when the edge is destroyed and created
659 TimeStamp timeStamp = null;
660 for (Property prop : oldProps) {
661 if (prop instanceof TimeStamp) {
662 TimeStamp tsProp = (TimeStamp) prop;
663 if (tsProp.getTimeStampName().equals("creation")) {
670 // Now lets make sure new properties are non-null
672 props = new HashSet<Property>();
674 // Copy the set so noone is going to change the content
675 props = new HashSet<Property>(props);
678 // Now lets remove the creation property if exist in the
680 for (Iterator<Property> i = props.iterator(); i.hasNext();) {
681 Property prop = i.next();
682 if (prop instanceof TimeStamp) {
683 TimeStamp t = (TimeStamp) prop;
684 if (t.getTimeStampName().equals("creation")) {
691 // Now lets add the creation timestamp in it
692 if (timeStamp != null) {
693 props.add(timeStamp);
697 this.edgesDB.put(e, props);
698 log.trace("Edge {} {}", e.toString(), type.name());
701 return new TopoEdgeUpdate(e, props, type);
705 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
706 List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
707 for (int i = 0; i < topoedgeupdateList.size(); i++) {
708 Edge e = topoedgeupdateList.get(i).getEdge();
709 Set<Property> p = topoedgeupdateList.get(i).getProperty();
710 UpdateType type = topoedgeupdateList.get(i).getUpdateType();
711 TopoEdgeUpdate teu = edgeUpdate(e, type, p);
717 if (!teuList.isEmpty()) {
718 // Now update the listeners
719 for (ITopologyManagerAware s : this.topologyManagerAware) {
721 s.edgeUpdate(teuList);
722 } catch (Exception exc) {
723 log.error("Exception on edge update:", exc);
729 private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
730 TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
731 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
732 return getLinkTuple(rLink);
736 private Edge getLinkTuple(TopologyUserLinkConfig link) {
737 NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
738 NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
740 return new Edge(srcNodeConnector, dstNodeConnector);
741 } catch (Exception e) {
747 public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
748 return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
752 public Status addUserLink(TopologyUserLinkConfig userLink) {
753 if (!userLink.isValid()) {
754 return new Status(StatusCode.BADREQUEST,
755 "User link configuration invalid.");
757 userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
759 //Check if this link already configured
760 //NOTE: infinispan cache doesn't support Map.containsValue()
761 // (which is linear time in most ConcurrentMap impl anyway)
762 for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
763 if (existingLink.equals(userLink)) {
764 return new Status(StatusCode.CONFLICT, "Link configuration exists");
767 //attempt put, if mapping for this key already existed return conflict
768 if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
769 return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
770 + " already exists. Please use another name");
773 Edge linkTuple = getLinkTuple(userLink);
774 if (linkTuple != null) {
775 if (!isProductionLink(linkTuple)) {
776 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
777 new HashSet<Property>());
779 userLinksDB.remove(userLink.getName());
780 return new Status(StatusCode.NOTFOUND,
781 "Link configuration contains invalid node connector: "
786 linkTuple = getReverseLinkTuple(userLink);
787 if (linkTuple != null) {
788 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
789 if (!isProductionLink(linkTuple)) {
790 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
794 return new Status(StatusCode.SUCCESS);
798 public Status deleteUserLink(String linkName) {
799 if (linkName == null) {
800 return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
803 TopologyUserLinkConfig link = userLinksDB.remove(linkName);
805 if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
806 if (! isProductionLink(linkTuple)) {
807 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
810 linkTuple = getReverseLinkTuple(link);
811 if (! isProductionLink(linkTuple)) {
812 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
815 return new Status(StatusCode.SUCCESS);
818 private void registerWithOSGIConsole() {
819 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
821 bundleContext.registerService(CommandProvider.class.getName(), this,
826 public String getHelp() {
827 StringBuffer help = new StringBuffer();
828 help.append("---Topology Manager---\n");
829 help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
830 help.append("\t deleteUserLink <name>\n");
831 help.append("\t printUserLink\n");
832 help.append("\t printNodeEdges\n");
833 return help.toString();
836 public void _printUserLink(CommandInterpreter ci) {
837 for (String name : this.userLinksDB.keySet()) {
838 TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
839 ci.println("Name : " + name);
840 ci.println(linkConfig);
841 ci.println("Edge " + getLinkTuple(linkConfig));
842 ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
846 public void _addUserLink(CommandInterpreter ci) {
847 String name = ci.nextArgument();
848 if ((name == null)) {
849 ci.println("Please enter a valid Name");
853 String ncStr1 = ci.nextArgument();
854 if (ncStr1 == null) {
855 ci.println("Please enter two node connector strings");
858 String ncStr2 = ci.nextArgument();
859 if (ncStr2 == null) {
860 ci.println("Please enter second node connector string");
864 NodeConnector nc1 = NodeConnector.fromString(ncStr1);
866 ci.println("Invalid input node connector 1 string: " + ncStr1);
869 NodeConnector nc2 = NodeConnector.fromString(ncStr2);
871 ci.println("Invalid input node connector 2 string: " + ncStr2);
875 TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
876 ci.println(this.addUserLink(config));
879 public void _deleteUserLink(CommandInterpreter ci) {
880 String name = ci.nextArgument();
881 if ((name == null)) {
882 ci.println("Please enter a valid Name");
885 this.deleteUserLink(name);
888 public void _printNodeEdges(CommandInterpreter ci) {
889 Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
890 if (nodeEdges == null) {
893 Set<Node> nodeSet = nodeEdges.keySet();
894 if (nodeSet == null) {
897 ci.println(" Node Edge");
898 for (Node node : nodeSet) {
899 Set<Edge> edgeSet = nodeEdges.get(node);
900 if (edgeSet == null) {
903 for (Edge edge : edgeSet) {
904 ci.println(node + " " + edge);
910 public Object readObject(ObjectInputStream ois)
911 throws FileNotFoundException, IOException, ClassNotFoundException {
912 return ois.readObject();
916 public Status saveConfiguration() {
921 public void edgeOverUtilized(Edge edge) {
922 log.warn("Link Utilization above normal: {}", edge);
926 public void edgeUtilBackToNormal(Edge edge) {
927 log.warn("Link Utilization back to normal: {}", edge);
930 private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
931 TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
932 upd.setLocal(isLocal);
937 public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
938 if (cacheName.equals(TOPOEDGESDB)) {
939 // This is the case of an Edge being added to the topology DB
940 final Edge e = (Edge) key;
941 log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
942 edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
947 public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
948 if (cacheName.equals(TOPOEDGESDB)) {
949 final Edge e = (Edge) key;
950 log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
951 final Set<Property> props = (Set<Property>) new_value;
952 edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
957 public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
958 if (cacheName.equals(TOPOEDGESDB)) {
959 final Edge e = (Edge) key;
960 log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
961 edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
965 class TopologyNotify implements Runnable {
966 private final BlockingQueue<TopoEdgeUpdate> notifyQ;
967 private TopoEdgeUpdate entry;
968 private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
969 private boolean notifyListeners;
971 TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
972 this.notifyQ = notifyQ;
979 log.trace("New run of TopologyNotify");
980 notifyListeners = false;
981 // First we block waiting for an element to get in
982 entry = notifyQ.take();
983 // Then we drain the whole queue if elements are
984 // in it without getting into any blocking condition
985 for (; entry != null; entry = notifyQ.poll()) {
987 notifyListeners = true;
990 // Notify listeners only if there were updates drained else
992 if (notifyListeners) {
993 log.trace("Notifier thread, notified a listener");
994 // Now update the listeners
995 for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
997 s.edgeUpdate(teuList);
998 } catch (Exception exc) {
999 log.error("Exception on edge update:", exc);
1005 // Lets sleep for sometime to allow aggregation of event
1007 } catch (InterruptedException e1) {
1011 log.warn("TopologyNotify interrupted {}", e1.getMessage());
1012 } catch (Exception e2) {