4972d3b5b5d73d41df3cfbbb99543db8fde2eb56
[controller.git] / opendaylight / topologymanager / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Date;
16 import java.util.Dictionary;
17 import java.util.EnumSet;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.topologymanager.ITopologyManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
59 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * The class describes TopologyManager which is the central repository of the
67  * network topology. It provides service for applications to interact with
68  * topology database and notifies all the listeners of topology changes.
69  */
70 public class TopologyManagerImpl implements
71         ICacheUpdateAware,
72         ITopologyManager,
73         IConfigurationContainerAware,
74         IListenTopoUpdates,
75         IObjectReader,
76         CommandProvider {
77     static final String TOPOEDGESDB = "topologymanager.edgesDB";
78     static final String TOPOHOSTSDB = "topologymanager.hostsDB";
79     static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
80     static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
81     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
82     private static final String SAVE = "Save";
83     private ITopologyService topoService;
84     private IClusterContainerServices clusterContainerService;
85     // DB of all the Edges with properties which constitute our topology
86     private ConcurrentMap<Edge, Set<Property>> edgesDB;
87     // DB of all NodeConnector which are part of ISL Edges, meaning they
88     // are connected to another NodeConnector on the other side of an ISL link.
89     // NodeConnector of a Production Edge is not part of this DB.
90     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
91     // DB of all the NodeConnectors with an Host attached to it
92     private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
93     // Topology Manager Aware listeners
94     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
95     // Topology Manager Aware listeners - for clusterwide updates
96     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
97             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
98     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
99     private String userLinksFileName;
100     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
101     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
102     private volatile Boolean shuttingDown = false;
103     private Thread notifyThread;
104
105
106     void nonClusterObjectCreate() {
107         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
108         hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
109         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
110         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
111     }
112
113     void setTopologyManagerAware(ITopologyManagerAware s) {
114         if (this.topologyManagerAware != null) {
115             log.debug("Adding ITopologyManagerAware: {}", s);
116             this.topologyManagerAware.add(s);
117         }
118     }
119
120     void unsetTopologyManagerAware(ITopologyManagerAware s) {
121         if (this.topologyManagerAware != null) {
122             log.debug("Removing ITopologyManagerAware: {}", s);
123             this.topologyManagerAware.remove(s);
124         }
125     }
126
127     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
128         if (this.topologyManagerClusterWideAware != null) {
129             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
130             this.topologyManagerClusterWideAware.add(s);
131         }
132     }
133
134     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
135         if (this.topologyManagerClusterWideAware != null) {
136             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
137             this.topologyManagerClusterWideAware.remove(s);
138         }
139     }
140
141     void setTopoService(ITopologyService s) {
142         log.debug("Adding ITopologyService: {}", s);
143         this.topoService = s;
144     }
145
146     void unsetTopoService(ITopologyService s) {
147         if (this.topoService == s) {
148             log.debug("Removing ITopologyService: {}", s);
149             this.topoService = null;
150         }
151     }
152
153     void setClusterContainerService(IClusterContainerServices s) {
154         log.debug("Cluster Service set");
155         this.clusterContainerService = s;
156     }
157
158     void unsetClusterContainerService(IClusterContainerServices s) {
159         if (this.clusterContainerService == s) {
160             log.debug("Cluster Service removed!");
161             this.clusterContainerService = null;
162         }
163     }
164
165     /**
166      * Function called by the dependency manager when all the required
167      * dependencies are satisfied
168      *
169      */
170     void init(Component c) {
171         allocateCaches();
172         retrieveCaches();
173         String containerName = null;
174         Dictionary<?, ?> props = c.getServiceProperties();
175         if (props != null) {
176             containerName = (String) props.get("containerName");
177         } else {
178             // In the Global instance case the containerName is empty
179             containerName = "UNKNOWN";
180         }
181
182         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
183         registerWithOSGIConsole();
184         loadConfiguration();
185         // Restore the shuttingDown status on init of the component
186         shuttingDown = false;
187         notifyThread = new Thread(new TopologyNotify(notifyQ));
188     }
189
190     @SuppressWarnings({ "unchecked", "deprecation" })
191     private void allocateCaches() {
192         try {
193             this.edgesDB =
194                     (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
195                             EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
196         } catch (CacheExistException cee) {
197             log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
198         } catch (CacheConfigException cce) {
199             log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
200         }
201
202         try {
203             this.hostsDB =
204                     (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
205                             TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
206         } catch (CacheExistException cee) {
207             log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
208         } catch (CacheConfigException cce) {
209             log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
210         }
211
212         try {
213             this.nodeConnectorsDB =
214                     (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
215                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
216         } catch (CacheExistException cee) {
217             log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
218         } catch (CacheConfigException cce) {
219             log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
220         }
221
222         try {
223             this.userLinksDB =
224                     (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
225                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
226         } catch (CacheExistException cee) {
227             log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
228         } catch (CacheConfigException cce) {
229             log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
230         }
231     }
232
233     @SuppressWarnings({ "unchecked", "deprecation" })
234     private void retrieveCaches() {
235         if (this.clusterContainerService == null) {
236             log.error("Cluster Services is null, can't retrieve caches.");
237             return;
238         }
239
240         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
241         if (edgesDB == null) {
242             log.error("Failed to get cache for " + TOPOEDGESDB);
243         }
244
245         this.hostsDB =
246                 (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
247         if (hostsDB == null) {
248             log.error("Failed to get cache for " + TOPOHOSTSDB);
249         }
250
251         this.nodeConnectorsDB =
252                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
253         if (nodeConnectorsDB == null) {
254             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
255         }
256
257         this.userLinksDB =
258                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
259         if (userLinksDB == null) {
260             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
261         }
262     }
263
264     /**
265      * Function called after the topology manager has registered the service in
266      * OSGi service registry.
267      *
268      */
269     void started() {
270         // Start the batcher thread for the cluster wide topology updates
271         notifyThread.start();
272         // SollicitRefresh MUST be called here else if called at init
273         // time it may sollicit refresh too soon.
274         log.debug("Sollicit topology refresh");
275         topoService.sollicitRefresh();
276     }
277
278     void stop() {
279         shuttingDown = true;
280         notifyThread.interrupt();
281     }
282
283     /**
284      * Function called by the dependency manager when at least one dependency
285      * become unsatisfied or when the component is shutting down because for
286      * example bundle is being stopped.
287      *
288      */
289     void destroy() {
290         notifyQ.clear();
291         notifyThread = null;
292     }
293
294     @SuppressWarnings("unchecked")
295     private void loadConfiguration() {
296         ObjectReader objReader = new ObjectReader();
297         ConcurrentMap<String, TopologyUserLinkConfig> confList =
298                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
299
300         if (confList != null) {
301             for (TopologyUserLinkConfig conf : confList.values()) {
302                 addUserLink(conf);
303             }
304         }
305     }
306
307     @Override
308     public Status saveConfig() {
309         return saveConfigInternal();
310     }
311
312     public Status saveConfigInternal() {
313         ObjectWriter objWriter = new ObjectWriter();
314
315         Status saveStatus = objWriter.write(
316                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
317
318         if (! saveStatus.isSuccess()) {
319             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
320         }
321         return saveStatus;
322     }
323
324     @Override
325     public Map<Node, Set<Edge>> getNodeEdges() {
326         if (this.edgesDB == null) {
327             return null;
328         }
329
330         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
331         for (Edge edge : this.edgesDB.keySet()) {
332             // Lets analyze the tail
333             Node node = edge.getTailNodeConnector().getNode();
334             Set<Edge> nodeEdges = res.get(node);
335             if (nodeEdges == null) {
336                 nodeEdges = new HashSet<Edge>();
337                 res.put(node, nodeEdges);
338             }
339             nodeEdges.add(edge);
340
341             // Lets analyze the head
342             node = edge.getHeadNodeConnector().getNode();
343             nodeEdges = res.get(node);
344             if (nodeEdges == null) {
345                 nodeEdges = new HashSet<Edge>();
346                 res.put(node, nodeEdges);
347             }
348             nodeEdges.add(edge);
349         }
350
351         return res;
352     }
353
354     @Override
355     public boolean isInternal(NodeConnector p) {
356         if (this.nodeConnectorsDB == null) {
357             return false;
358         }
359
360         // This is an internal NodeConnector if is connected to
361         // another Node i.e it's part of the nodeConnectorsDB
362         return (this.nodeConnectorsDB.get(p) != null);
363     }
364
365     /**
366      * This method returns true if the edge is an ISL link.
367      *
368      * @param e
369      *            The edge
370      * @return true if it is an ISL link
371      */
372     public boolean isISLink(Edge e) {
373         return (!isProductionLink(e));
374     }
375
376     /**
377      * This method returns true if the edge is a production link.
378      *
379      * @param e
380      *            The edge
381      * @return true if it is a production link
382      */
383     public boolean isProductionLink(Edge e) {
384         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
385                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
386     }
387
388     /**
389      * The Map returned is a copy of the current topology hence if the topology
390      * changes the copy doesn't
391      *
392      * @return A Map representing the current topology expressed as edges of the
393      *         network
394      */
395     @Override
396     public Map<Edge, Set<Property>> getEdges() {
397         if (this.edgesDB == null) {
398             return null;
399         }
400
401         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
402         Set<Property> props;
403         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
404             // Sets of props are copied because the composition of
405             // those properties could change with time
406             props = new HashSet<Property>(edgeEntry.getValue());
407             // We can simply reuse the key because the object is
408             // immutable so doesn't really matter that we are
409             // referencing the only owned by a different table, the
410             // meaning is the same because doesn't change with time.
411             edgeMap.put(edgeEntry.getKey(), props);
412         }
413
414         return edgeMap;
415     }
416
417     @Override
418     public Set<NodeConnector> getNodeConnectorWithHost() {
419         if (this.hostsDB == null) {
420             return null;
421         }
422
423         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
424     }
425
426     @Override
427     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
428         if (this.hostsDB == null) {
429             return null;
430         }
431         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
432         Node node;
433         Set<NodeConnector> portSet;
434         for (NodeConnector nc : this.hostsDB.keySet()) {
435             node = nc.getNode();
436             portSet = res.get(node);
437             if (portSet == null) {
438                 // Create the HashSet if null
439                 portSet = new HashSet<NodeConnector>();
440                 res.put(node, portSet);
441             }
442
443             // Keep updating the HashSet, given this is not a
444             // clustered map we can just update the set without
445             // worrying to update the hashmap.
446             portSet.add(nc);
447         }
448
449         return (res);
450     }
451
452     @Override
453     public Host getHostAttachedToNodeConnector(NodeConnector port) {
454         ImmutablePair<Host, Set<Property>> host;
455         if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
456             return null;
457         }
458         return host.getLeft();
459     }
460
461     @Override
462     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
463
464         // Clone the property set in case non null else just
465         // create an empty one. Caches allocated via infinispan
466         // don't allow null values
467         if (props == null) {
468             props = new HashSet<Property>();
469         } else {
470             props = new HashSet<Property>(props);
471         }
472         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
473
474         switch (t) {
475         case ADDED:
476         case CHANGED:
477             this.hostsDB.put(port, thisHost);
478             break;
479         case REMOVED:
480             //remove only if hasn't been concurrently modified
481             this.hostsDB.remove(port, thisHost);
482             break;
483         }
484     }
485
486     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
487         switch (type) {
488         case ADDED:
489             // Make sure the props are non-null
490             if (props == null) {
491                 props = new HashSet<Property>();
492             } else {
493                 props = new HashSet<Property>(props);
494             }
495
496             //in case of node switch-over to a different cluster controller,
497             //let's retain edge props
498             Set<Property> currentProps = this.edgesDB.get(e);
499             if (currentProps != null){
500                 props.addAll(currentProps);
501             }
502
503             // Now make sure there is the creation timestamp for the
504             // edge, if not there, stamp with the first update
505             boolean found_create = false;
506             for (Property prop : props) {
507                 if (prop instanceof TimeStamp) {
508                     TimeStamp t = (TimeStamp) prop;
509                     if (t.getTimeStampName().equals("creation")) {
510                         found_create = true;
511                         break;
512                     }
513                 }
514             }
515
516             if (!found_create) {
517                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
518                 props.add(t);
519             }
520
521             // Now add this in the database eventually overriding
522             // something that may have been already existing
523             this.edgesDB.put(e, props);
524
525             // Now populate the DB of NodeConnectors
526             // NOTE WELL: properties are empty sets, not really needed
527             // for now.
528             // The DB only contains ISL ports
529             if (isISLink(e)) {
530                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
531                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
532             }
533             log.trace("Edge {}  {}", e.toString(), type.name());
534             break;
535         case REMOVED:
536             // Now remove the edge from edgesDB
537             this.edgesDB.remove(e);
538
539             // Now lets update the NodeConnectors DB, the assumption
540             // here is that two NodeConnector are exclusively
541             // connected by 1 and only 1 edge, this is reasonable in
542             // the same plug (virtual of phisical) we can assume two
543             // cables won't be plugged. This could break only in case
544             // of devices in the middle that acts as hubs, but it
545             // should be safe to assume that won't happen.
546             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
547             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
548             log.trace("Edge {}  {}", e.toString(), type.name());
549             break;
550         case CHANGED:
551             Set<Property> oldProps = this.edgesDB.get(e);
552
553             // When property changes lets make sure we can change it
554             // all except the creation time stamp because that should
555             // be changed only when the edge is destroyed and created
556             // again
557             TimeStamp timeStamp = null;
558             for (Property prop : oldProps) {
559                 if (prop instanceof TimeStamp) {
560                     TimeStamp tsProp = (TimeStamp) prop;
561                     if (tsProp.getTimeStampName().equals("creation")) {
562                         timeStamp = tsProp;
563                         break;
564                     }
565                 }
566             }
567
568             // Now lets make sure new properties are non-null
569             if (props == null) {
570                 props = new HashSet<Property>();
571             } else {
572                 // Copy the set so noone is going to change the content
573                 props = new HashSet<Property>(props);
574             }
575
576             // Now lets remove the creation property if exist in the
577             // new props
578             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
579                 Property prop = i.next();
580                 if (prop instanceof TimeStamp) {
581                     TimeStamp t = (TimeStamp) prop;
582                     if (t.getTimeStampName().equals("creation")) {
583                         i.remove();
584                         break;
585                     }
586                 }
587             }
588
589             // Now lets add the creation timestamp in it
590             if (timeStamp != null) {
591                 props.add(timeStamp);
592             }
593
594             // Finally update
595             this.edgesDB.put(e, props);
596             log.trace("Edge {}  {}", e.toString(), type.name());
597             break;
598         }
599         return new TopoEdgeUpdate(e, props, type);
600     }
601
602     @Override
603     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
604         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
605         for (int i = 0; i < topoedgeupdateList.size(); i++) {
606             Edge e = topoedgeupdateList.get(i).getEdge();
607             Set<Property> p = topoedgeupdateList.get(i).getProperty();
608             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
609             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
610             teuList.add(teu);
611         }
612
613         // Now update the listeners
614         for (ITopologyManagerAware s : this.topologyManagerAware) {
615             try {
616                 s.edgeUpdate(teuList);
617             } catch (Exception exc) {
618                 log.error("Exception on edge update:", exc);
619             }
620         }
621
622     }
623
624     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
625         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
626                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
627         return getLinkTuple(rLink);
628     }
629
630
631     private Edge getLinkTuple(TopologyUserLinkConfig link) {
632         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
633         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
634         try {
635             return new Edge(srcNodeConnector, dstNodeConnector);
636         } catch (Exception e) {
637             return null;
638         }
639     }
640
641     @Override
642     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
643         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
644     }
645
646     @Override
647     public Status addUserLink(TopologyUserLinkConfig userLink) {
648         if (!userLink.isValid()) {
649             return new Status(StatusCode.BADREQUEST,
650                     "User link configuration invalid.");
651         }
652         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
653
654         //Check if this link already configured
655         //NOTE: infinispan cache doesn't support Map.containsValue()
656         // (which is linear time in most ConcurrentMap impl anyway)
657         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
658             if (existingLink.equals(userLink)) {
659                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
660             }
661         }
662         //attempt put, if mapping for this key already existed return conflict
663         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
664             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
665                     + " already exists. Please use another name");
666         }
667
668         Edge linkTuple = getLinkTuple(userLink);
669         if (linkTuple != null) {
670             if (!isProductionLink(linkTuple)) {
671                 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
672             }
673
674             linkTuple = getReverseLinkTuple(userLink);
675             if (linkTuple != null) {
676                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
677                 if (!isProductionLink(linkTuple)) {
678                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
679                 }
680             }
681         }
682         return new Status(StatusCode.SUCCESS);
683     }
684
685     @Override
686     public Status deleteUserLink(String linkName) {
687         if (linkName == null) {
688             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
689         }
690
691         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
692         Edge linkTuple;
693         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
694             if (! isProductionLink(linkTuple)) {
695                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
696             }
697
698             linkTuple = getReverseLinkTuple(link);
699             if (! isProductionLink(linkTuple)) {
700                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
701             }
702         }
703         return new Status(StatusCode.SUCCESS);
704     }
705
706     private void registerWithOSGIConsole() {
707         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
708                 .getBundleContext();
709         bundleContext.registerService(CommandProvider.class.getName(), this,
710                 null);
711     }
712
713     @Override
714     public String getHelp() {
715         StringBuffer help = new StringBuffer();
716         help.append("---Topology Manager---\n");
717         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
718         help.append("\t deleteUserLink <name>\n");
719         help.append("\t printUserLink\n");
720         help.append("\t printNodeEdges\n");
721         return help.toString();
722     }
723
724     public void _printUserLink(CommandInterpreter ci) {
725         for (String name : this.userLinksDB.keySet()) {
726             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
727             ci.println("Name : " + name);
728             ci.println(linkConfig);
729             ci.println("Edge " + getLinkTuple(linkConfig));
730             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
731         }
732     }
733
734     public void _addUserLink(CommandInterpreter ci) {
735         String name = ci.nextArgument();
736         if ((name == null)) {
737             ci.println("Please enter a valid Name");
738             return;
739         }
740
741         String ncStr1 = ci.nextArgument();
742         if (ncStr1 == null) {
743             ci.println("Please enter two node connector strings");
744             return;
745         }
746         String ncStr2 = ci.nextArgument();
747         if (ncStr2 == null) {
748             ci.println("Please enter second node connector string");
749             return;
750         }
751
752         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
753         if (nc1 == null) {
754             ci.println("Invalid input node connector 1 string: " + ncStr1);
755             return;
756         }
757         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
758         if (nc2 == null) {
759             ci.println("Invalid input node connector 2 string: " + ncStr2);
760             return;
761         }
762
763         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
764         ci.println(this.addUserLink(config));
765     }
766
767     public void _deleteUserLink(CommandInterpreter ci) {
768         String name = ci.nextArgument();
769         if ((name == null)) {
770             ci.println("Please enter a valid Name");
771             return;
772         }
773         this.deleteUserLink(name);
774     }
775
776     public void _printNodeEdges(CommandInterpreter ci) {
777         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
778         if (nodeEdges == null) {
779             return;
780         }
781         Set<Node> nodeSet = nodeEdges.keySet();
782         if (nodeSet == null) {
783             return;
784         }
785         ci.println("        Node                                         Edge");
786         for (Node node : nodeSet) {
787             Set<Edge> edgeSet = nodeEdges.get(node);
788             if (edgeSet == null) {
789                 continue;
790             }
791             for (Edge edge : edgeSet) {
792                 ci.println(node + "             " + edge);
793             }
794         }
795     }
796
797     @Override
798     public Object readObject(ObjectInputStream ois)
799             throws FileNotFoundException, IOException, ClassNotFoundException {
800         return ois.readObject();
801     }
802
803     @Override
804     public Status saveConfiguration() {
805         return saveConfig();
806     }
807
808     @Override
809     public void edgeOverUtilized(Edge edge) {
810         log.warn("Link Utilization above normal: {}", edge);
811     }
812
813     @Override
814     public void edgeUtilBackToNormal(Edge edge) {
815         log.warn("Link Utilization back to normal: {}", edge);
816     }
817
818     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
819         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
820         upd.setLocal(isLocal);
821         notifyQ.add(upd);
822     }
823
824     @Override
825     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
826         if (cacheName.equals(TOPOEDGESDB)) {
827             // This is the case of an Edge being added to the topology DB
828             final Edge e = (Edge) key;
829             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
830             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
831         }
832     }
833
834     @Override
835     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
836         if (cacheName.equals(TOPOEDGESDB)) {
837             final Edge e = (Edge) key;
838             log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
839             final Set<Property> props = (Set<Property>) new_value;
840             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
841         }
842     }
843
844     @Override
845     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
846         if (cacheName.equals(TOPOEDGESDB)) {
847             final Edge e = (Edge) key;
848             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
849             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
850         }
851     }
852
853     class TopologyNotify implements Runnable {
854         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
855         private TopoEdgeUpdate entry;
856         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
857         private boolean notifyListeners;
858
859         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
860             this.notifyQ = notifyQ;
861         }
862
863         @Override
864         public void run() {
865             while (true) {
866                 try {
867                     log.trace("New run of TopologyNotify");
868                     notifyListeners = false;
869                     // First we block waiting for an element to get in
870                     entry = notifyQ.take();
871                     // Then we drain the whole queue if elements are
872                     // in it without getting into any blocking condition
873                     for (; entry != null; entry = notifyQ.poll()) {
874                         teuList.add(entry);
875                         notifyListeners = true;
876                     }
877
878                     // Notify listeners only if there were updates drained else
879                     // give up
880                     if (notifyListeners) {
881                         log.trace("Notifier thread, notified a listener");
882                         // Now update the listeners
883                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
884                             try {
885                                 s.edgeUpdate(teuList);
886                             } catch (Exception exc) {
887                                 log.error("Exception on edge update:", exc);
888                             }
889                         }
890                     }
891                     teuList.clear();
892
893                     // Lets sleep for sometime to allow aggregation of event
894                     Thread.sleep(100);
895                 } catch (InterruptedException e1) {
896                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
897                     if (shuttingDown) {
898                         return;
899                     }
900                 } catch (Exception e2) {
901                     log.error("", e2);
902                 }
903             }
904         }
905     }
906 }