5a38ecd4a61db131623b1e56efef7849bbba2800
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.Set;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.CopyOnWriteArraySet;
29 import java.util.concurrent.LinkedBlockingQueue;
30
31 import org.apache.commons.lang3.tuple.ImmutablePair;
32 import org.apache.felix.dm.Component;
33 import org.eclipse.osgi.framework.console.CommandInterpreter;
34 import org.eclipse.osgi.framework.console.CommandProvider;
35 import org.opendaylight.controller.clustering.services.CacheConfigException;
36 import org.opendaylight.controller.clustering.services.CacheExistException;
37 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
38 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
39 import org.opendaylight.controller.clustering.services.IClusterServices;
40 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
41 import org.opendaylight.controller.sal.core.Edge;
42 import org.opendaylight.controller.sal.core.Host;
43 import org.opendaylight.controller.sal.core.Node;
44 import org.opendaylight.controller.sal.core.NodeConnector;
45 import org.opendaylight.controller.sal.core.Property;
46 import org.opendaylight.controller.sal.core.TimeStamp;
47 import org.opendaylight.controller.sal.core.UpdateType;
48 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
49 import org.opendaylight.controller.sal.topology.ITopologyService;
50 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
51 import org.opendaylight.controller.sal.utils.GlobalConstants;
52 import org.opendaylight.controller.sal.utils.IObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectReader;
54 import org.opendaylight.controller.sal.utils.ObjectWriter;
55 import org.opendaylight.controller.sal.utils.Status;
56 import org.opendaylight.controller.sal.utils.StatusCode;
57 import org.opendaylight.controller.switchmanager.ISwitchManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManager;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
60 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
61 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
62 import org.osgi.framework.BundleContext;
63 import org.osgi.framework.FrameworkUtil;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  * The class describes TopologyManager which is the central repository of the
69  * network topology. It provides service for applications to interact with
70  * topology database and notifies all the listeners of topology changes.
71  */
72 public class TopologyManagerImpl implements
73         ICacheUpdateAware,
74         ITopologyManager,
75         IConfigurationContainerAware,
76         IListenTopoUpdates,
77         IObjectReader,
78         CommandProvider {
79     static final String TOPOEDGESDB = "topologymanager.edgesDB";
80     static final String TOPOHOSTSDB = "topologymanager.hostsDB";
81     static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
82     static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
83     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
84     private ITopologyService topoService;
85     private IClusterContainerServices clusterContainerService;
86     private ISwitchManager switchManager;
87     // DB of all the Edges with properties which constitute our topology
88     private ConcurrentMap<Edge, Set<Property>> edgesDB;
89     // DB of all NodeConnector which are part of ISL Edges, meaning they
90     // are connected to another NodeConnector on the other side of an ISL link.
91     // NodeConnector of a Production Edge is not part of this DB.
92     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
93     // DB of all the NodeConnectors with an Host attached to it
94     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
95     // Topology Manager Aware listeners
96     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
97     // Topology Manager Aware listeners - for clusterwide updates
98     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
99             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
100     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
101     private String userLinksFileName;
102     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
103     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
104     private volatile Boolean shuttingDown = false;
105     private Thread notifyThread;
106
107
108     void nonClusterObjectCreate() {
109         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
110         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
111         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
112         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
113     }
114
115     void setTopologyManagerAware(ITopologyManagerAware s) {
116         if (this.topologyManagerAware != null) {
117             log.debug("Adding ITopologyManagerAware: {}", s);
118             this.topologyManagerAware.add(s);
119             // Reply all the known edges
120             if (this.edgesDB != null) {
121                 List<TopoEdgeUpdate> existingEdges = new ArrayList<TopoEdgeUpdate>();
122                 for (Entry<Edge, Set<Property>> entry : this.edgesDB.entrySet()) {
123                     existingEdges.add(new TopoEdgeUpdate(entry.getKey(), entry.getValue(), UpdateType.ADDED));
124                 }
125                 s.edgeUpdate(existingEdges);
126             }
127         }
128     }
129
130     void unsetTopologyManagerAware(ITopologyManagerAware s) {
131         if (this.topologyManagerAware != null) {
132             log.debug("Removing ITopologyManagerAware: {}", s);
133             this.topologyManagerAware.remove(s);
134         }
135     }
136
137     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
138         if (this.topologyManagerClusterWideAware != null) {
139             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
140             this.topologyManagerClusterWideAware.add(s);
141             // Reply all the known edges
142             if (this.edgesDB != null) {
143                 List<TopoEdgeUpdate> existingEdges = new ArrayList<TopoEdgeUpdate>();
144                 for (Entry<Edge, Set<Property>> entry : this.edgesDB.entrySet()) {
145                     existingEdges.add(new TopoEdgeUpdate(entry.getKey(), entry.getValue(), UpdateType.ADDED));
146                 }
147                 s.edgeUpdate(existingEdges);
148             }
149         }
150     }
151
152     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
153         if (this.topologyManagerClusterWideAware != null) {
154             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
155             this.topologyManagerClusterWideAware.remove(s);
156         }
157     }
158
159     void setTopoService(ITopologyService s) {
160         log.debug("Adding ITopologyService: {}", s);
161         this.topoService = s;
162     }
163
164     void unsetTopoService(ITopologyService s) {
165         if (this.topoService == s) {
166             log.debug("Removing ITopologyService: {}", s);
167             this.topoService = null;
168         }
169     }
170
171     void setClusterContainerService(IClusterContainerServices s) {
172         log.debug("Cluster Service set");
173         this.clusterContainerService = s;
174     }
175
176     void unsetClusterContainerService(IClusterContainerServices s) {
177         if (this.clusterContainerService == s) {
178             log.debug("Cluster Service removed!");
179             this.clusterContainerService = null;
180         }
181     }
182
183     void setSwitchManager(ISwitchManager s) {
184         log.debug("Adding ISwitchManager: {}", s);
185         this.switchManager = s;
186     }
187
188     void unsetSwitchManager(ISwitchManager s) {
189         if (this.switchManager == s) {
190             log.debug("Removing ISwitchManager: {}", s);
191             this.switchManager = null;
192         }
193     }
194
195     /**
196      * Function called by the dependency manager when all the required
197      * dependencies are satisfied
198      *
199      */
200     void init(Component c) {
201         allocateCaches();
202         retrieveCaches();
203         String containerName = null;
204         Dictionary<?, ?> props = c.getServiceProperties();
205         if (props != null) {
206             containerName = (String) props.get("containerName");
207         } else {
208             // In the Global instance case the containerName is empty
209             containerName = "UNKNOWN";
210         }
211
212         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
213         registerWithOSGIConsole();
214         loadConfiguration();
215         // Restore the shuttingDown status on init of the component
216         shuttingDown = false;
217         notifyThread = new Thread(new TopologyNotify(notifyQ));
218     }
219
220     @SuppressWarnings({ "unchecked", "deprecation" })
221     private void allocateCaches() {
222         try {
223             this.edgesDB =
224                     (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
225                             EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
226         } catch (CacheExistException cee) {
227             log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
228         } catch (CacheConfigException cce) {
229             log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
230         }
231
232         try {
233             this.hostsDB =
234                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
235                             TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
236         } catch (CacheExistException cee) {
237             log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
238         } catch (CacheConfigException cce) {
239             log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
240         }
241
242         try {
243             this.nodeConnectorsDB =
244                     (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
245                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
246         } catch (CacheExistException cee) {
247             log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
248         } catch (CacheConfigException cce) {
249             log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
250         }
251
252         try {
253             this.userLinksDB =
254                     (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
255                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
256         } catch (CacheExistException cee) {
257             log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
258         } catch (CacheConfigException cce) {
259             log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
260         }
261     }
262
263     @SuppressWarnings({ "unchecked", "deprecation" })
264     private void retrieveCaches() {
265         if (this.clusterContainerService == null) {
266             log.error("Cluster Services is null, can't retrieve caches.");
267             return;
268         }
269
270         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
271         if (edgesDB == null) {
272             log.error("Failed to get cache for " + TOPOEDGESDB);
273         }
274
275         this.hostsDB =
276                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
277         if (hostsDB == null) {
278             log.error("Failed to get cache for " + TOPOHOSTSDB);
279         }
280
281         this.nodeConnectorsDB =
282                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
283         if (nodeConnectorsDB == null) {
284             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
285         }
286
287         this.userLinksDB =
288                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
289         if (userLinksDB == null) {
290             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
291         }
292     }
293
294     /**
295      * Function called after the topology manager has registered the service in
296      * OSGi service registry.
297      *
298      */
299     void started() {
300         // Start the batcher thread for the cluster wide topology updates
301         notifyThread.start();
302         // SollicitRefresh MUST be called here else if called at init
303         // time it may sollicit refresh too soon.
304         log.debug("Sollicit topology refresh");
305         topoService.sollicitRefresh();
306     }
307
308     void stop() {
309         shuttingDown = true;
310         notifyThread.interrupt();
311     }
312
313     /**
314      * Function called by the dependency manager when at least one dependency
315      * become unsatisfied or when the component is shutting down because for
316      * example bundle is being stopped.
317      *
318      */
319     void destroy() {
320         notifyQ.clear();
321         notifyThread = null;
322     }
323
324     @SuppressWarnings("unchecked")
325     private void loadConfiguration() {
326         ObjectReader objReader = new ObjectReader();
327         ConcurrentMap<String, TopologyUserLinkConfig> confList =
328                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
329
330         if (confList != null) {
331             for (TopologyUserLinkConfig conf : confList.values()) {
332                 addUserLink(conf);
333             }
334         }
335     }
336
337     @Override
338     public Status saveConfig() {
339         return saveConfigInternal();
340     }
341
342     public Status saveConfigInternal() {
343         ObjectWriter objWriter = new ObjectWriter();
344
345         Status saveStatus = objWriter.write(
346                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
347
348         if (! saveStatus.isSuccess()) {
349             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
350         }
351         return saveStatus;
352     }
353
354     @Override
355     public Map<Node, Set<Edge>> getNodeEdges() {
356         if (this.edgesDB == null) {
357             return null;
358         }
359
360         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
361         for (Edge edge : this.edgesDB.keySet()) {
362             // Lets analyze the tail
363             Node node = edge.getTailNodeConnector().getNode();
364             Set<Edge> nodeEdges = res.get(node);
365             if (nodeEdges == null) {
366                 nodeEdges = new HashSet<Edge>();
367                 res.put(node, nodeEdges);
368             }
369             nodeEdges.add(edge);
370
371             // Lets analyze the head
372             node = edge.getHeadNodeConnector().getNode();
373             nodeEdges = res.get(node);
374             if (nodeEdges == null) {
375                 nodeEdges = new HashSet<Edge>();
376                 res.put(node, nodeEdges);
377             }
378             nodeEdges.add(edge);
379         }
380
381         return res;
382     }
383
384     @Override
385     public boolean isInternal(NodeConnector p) {
386         if (this.nodeConnectorsDB == null) {
387             return false;
388         }
389
390         // This is an internal NodeConnector if is connected to
391         // another Node i.e it's part of the nodeConnectorsDB
392         return (this.nodeConnectorsDB.get(p) != null);
393     }
394
395     /**
396      * This method returns true if the edge is an ISL link.
397      *
398      * @param e
399      *            The edge
400      * @return true if it is an ISL link
401      */
402     public boolean isISLink(Edge e) {
403         return (!isProductionLink(e));
404     }
405
406     /**
407      * This method returns true if the edge is a production link.
408      *
409      * @param e
410      *            The edge
411      * @return true if it is a production link
412      */
413     public boolean isProductionLink(Edge e) {
414         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
415                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
416     }
417
418     /**
419      * The Map returned is a copy of the current topology hence if the topology
420      * changes the copy doesn't
421      *
422      * @return A Map representing the current topology expressed as edges of the
423      *         network
424      */
425     @Override
426     public Map<Edge, Set<Property>> getEdges() {
427         if (this.edgesDB == null) {
428             return null;
429         }
430
431         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
432         Set<Property> props;
433         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
434             // Sets of props are copied because the composition of
435             // those properties could change with time
436             props = new HashSet<Property>(edgeEntry.getValue());
437             // We can simply reuse the key because the object is
438             // immutable so doesn't really matter that we are
439             // referencing the only owned by a different table, the
440             // meaning is the same because doesn't change with time.
441             edgeMap.put(edgeEntry.getKey(), props);
442         }
443
444         return edgeMap;
445     }
446
447     @Override
448     public Set<NodeConnector> getNodeConnectorWithHost() {
449         if (this.hostsDB == null) {
450             return null;
451         }
452
453         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
454     }
455
456     @Override
457     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
458         if (this.hostsDB == null) {
459             return null;
460         }
461         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
462         Node node;
463         Set<NodeConnector> portSet;
464         for (NodeConnector nc : this.hostsDB.keySet()) {
465             node = nc.getNode();
466             portSet = res.get(node);
467             if (portSet == null) {
468                 // Create the HashSet if null
469                 portSet = new HashSet<NodeConnector>();
470                 res.put(node, portSet);
471             }
472
473             // Keep updating the HashSet, given this is not a
474             // clustered map we can just update the set without
475             // worrying to update the hashmap.
476             portSet.add(nc);
477         }
478
479         return (res);
480     }
481
482     @Override
483     public Host getHostAttachedToNodeConnector(NodeConnector port) {
484         List<Host> hosts = getHostsAttachedToNodeConnector(port);
485         if(hosts != null && !hosts.isEmpty()){
486             return hosts.get(0);
487         }
488         return null;
489     }
490
491     @Override
492     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
493         Set<ImmutablePair<Host, Set<Property>>> hosts;
494         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
495             return null;
496         }
497         // create a list of hosts
498         List<Host> retHosts = new LinkedList<Host>();
499         for(ImmutablePair<Host, Set<Property>> host : hosts) {
500             retHosts.add(host.getLeft());
501         }
502         return retHosts;
503     }
504
505     @Override
506     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
507
508         // Clone the property set in case non null else just
509         // create an empty one. Caches allocated via infinispan
510         // don't allow null values
511         if (props == null) {
512             props = new HashSet<Property>();
513         } else {
514             props = new HashSet<Property>(props);
515         }
516         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
517
518         // get the host list
519         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
520         if(hostSet == null) {
521             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
522         }
523         switch (t) {
524         case ADDED:
525         case CHANGED:
526             hostSet.add(thisHost);
527             this.hostsDB.put(port, hostSet);
528             break;
529         case REMOVED:
530             hostSet.remove(thisHost);
531             if(hostSet.isEmpty()) {
532                 //remove only if hasn't been concurrently modified
533                 this.hostsDB.remove(port, hostSet);
534             } else {
535                 this.hostsDB.put(port, hostSet);
536             }
537             break;
538         }
539     }
540
541     private boolean nodeConnectorsExist(Edge e) {
542         NodeConnector head = e.getHeadNodeConnector();
543         NodeConnector tail = e.getTailNodeConnector();
544         return (switchManager.doesNodeConnectorExist(head) &&
545                 switchManager.doesNodeConnectorExist(tail));
546     }
547
548     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
549         switch (type) {
550         case ADDED:
551             // Ensure that both tail and head node connectors exist.
552             if (!nodeConnectorsExist(e)) {
553                 log.warn("Ignore edge that contains invalid node connector: {}", e);
554                 return null;
555             }
556
557             // Make sure the props are non-null
558             if (props == null) {
559                 props = new HashSet<Property>();
560             } else {
561                 props = new HashSet<Property>(props);
562             }
563
564             //in case of node switch-over to a different cluster controller,
565             //let's retain edge props
566             Set<Property> currentProps = this.edgesDB.get(e);
567             if (currentProps != null){
568                 props.addAll(currentProps);
569             }
570
571             // Now make sure there is the creation timestamp for the
572             // edge, if not there, stamp with the first update
573             boolean found_create = false;
574             for (Property prop : props) {
575                 if (prop instanceof TimeStamp) {
576                     TimeStamp t = (TimeStamp) prop;
577                     if (t.getTimeStampName().equals("creation")) {
578                         found_create = true;
579                         break;
580                     }
581                 }
582             }
583
584             if (!found_create) {
585                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
586                 props.add(t);
587             }
588
589             // Now add this in the database eventually overriding
590             // something that may have been already existing
591             this.edgesDB.put(e, props);
592
593             // Now populate the DB of NodeConnectors
594             // NOTE WELL: properties are empty sets, not really needed
595             // for now.
596             // The DB only contains ISL ports
597             if (isISLink(e)) {
598                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
599                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
600             }
601             log.trace("Edge {}  {}", e.toString(), type.name());
602             break;
603         case REMOVED:
604             // Now remove the edge from edgesDB
605             this.edgesDB.remove(e);
606
607             // Now lets update the NodeConnectors DB, the assumption
608             // here is that two NodeConnector are exclusively
609             // connected by 1 and only 1 edge, this is reasonable in
610             // the same plug (virtual of phisical) we can assume two
611             // cables won't be plugged. This could break only in case
612             // of devices in the middle that acts as hubs, but it
613             // should be safe to assume that won't happen.
614             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
615             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
616             log.trace("Edge {}  {}", e.toString(), type.name());
617             break;
618         case CHANGED:
619             Set<Property> oldProps = this.edgesDB.get(e);
620
621             // When property changes lets make sure we can change it
622             // all except the creation time stamp because that should
623             // be changed only when the edge is destroyed and created
624             // again
625             TimeStamp timeStamp = null;
626             for (Property prop : oldProps) {
627                 if (prop instanceof TimeStamp) {
628                     TimeStamp tsProp = (TimeStamp) prop;
629                     if (tsProp.getTimeStampName().equals("creation")) {
630                         timeStamp = tsProp;
631                         break;
632                     }
633                 }
634             }
635
636             // Now lets make sure new properties are non-null
637             if (props == null) {
638                 props = new HashSet<Property>();
639             } else {
640                 // Copy the set so noone is going to change the content
641                 props = new HashSet<Property>(props);
642             }
643
644             // Now lets remove the creation property if exist in the
645             // new props
646             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
647                 Property prop = i.next();
648                 if (prop instanceof TimeStamp) {
649                     TimeStamp t = (TimeStamp) prop;
650                     if (t.getTimeStampName().equals("creation")) {
651                         i.remove();
652                         break;
653                     }
654                 }
655             }
656
657             // Now lets add the creation timestamp in it
658             if (timeStamp != null) {
659                 props.add(timeStamp);
660             }
661
662             // Finally update
663             this.edgesDB.put(e, props);
664             log.trace("Edge {}  {}", e.toString(), type.name());
665             break;
666         }
667         return new TopoEdgeUpdate(e, props, type);
668     }
669
670     @Override
671     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
672         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
673         for (int i = 0; i < topoedgeupdateList.size(); i++) {
674             Edge e = topoedgeupdateList.get(i).getEdge();
675             Set<Property> p = topoedgeupdateList.get(i).getProperty();
676             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
677             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
678             if (teu != null) {
679                 teuList.add(teu);
680             }
681         }
682
683         if (!teuList.isEmpty()) {
684             // Now update the listeners
685             for (ITopologyManagerAware s : this.topologyManagerAware) {
686                 try {
687                     s.edgeUpdate(teuList);
688                 } catch (Exception exc) {
689                     log.error("Exception on edge update:", exc);
690                 }
691             }
692         }
693     }
694
695     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
696         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
697                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
698         return getLinkTuple(rLink);
699     }
700
701
702     private Edge getLinkTuple(TopologyUserLinkConfig link) {
703         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
704         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
705         try {
706             return new Edge(srcNodeConnector, dstNodeConnector);
707         } catch (Exception e) {
708             return null;
709         }
710     }
711
712     @Override
713     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
714         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
715     }
716
717     @Override
718     public Status addUserLink(TopologyUserLinkConfig userLink) {
719         if (!userLink.isValid()) {
720             return new Status(StatusCode.BADREQUEST,
721                     "User link configuration invalid.");
722         }
723         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
724
725         //Check if this link already configured
726         //NOTE: infinispan cache doesn't support Map.containsValue()
727         // (which is linear time in most ConcurrentMap impl anyway)
728         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
729             if (existingLink.equals(userLink)) {
730                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
731             }
732         }
733         //attempt put, if mapping for this key already existed return conflict
734         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
735             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
736                     + " already exists. Please use another name");
737         }
738
739         Edge linkTuple = getLinkTuple(userLink);
740         if (linkTuple != null) {
741             if (!isProductionLink(linkTuple)) {
742                 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
743                                                 new HashSet<Property>());
744                 if (teu == null) {
745                     userLinksDB.remove(userLink.getName());
746                     return new Status(StatusCode.NOTFOUND,
747                            "Link configuration contains invalid node connector: "
748                            + userLink);
749                 }
750             }
751
752             linkTuple = getReverseLinkTuple(userLink);
753             if (linkTuple != null) {
754                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
755                 if (!isProductionLink(linkTuple)) {
756                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
757                 }
758             }
759         }
760         return new Status(StatusCode.SUCCESS);
761     }
762
763     @Override
764     public Status deleteUserLink(String linkName) {
765         if (linkName == null) {
766             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
767         }
768
769         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
770         Edge linkTuple;
771         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
772             if (! isProductionLink(linkTuple)) {
773                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
774             }
775
776             linkTuple = getReverseLinkTuple(link);
777             if (! isProductionLink(linkTuple)) {
778                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
779             }
780         }
781         return new Status(StatusCode.SUCCESS);
782     }
783
784     private void registerWithOSGIConsole() {
785         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
786                 .getBundleContext();
787         bundleContext.registerService(CommandProvider.class.getName(), this,
788                 null);
789     }
790
791     @Override
792     public String getHelp() {
793         StringBuffer help = new StringBuffer();
794         help.append("---Topology Manager---\n");
795         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
796         help.append("\t deleteUserLink <name>\n");
797         help.append("\t printUserLink\n");
798         help.append("\t printNodeEdges\n");
799         return help.toString();
800     }
801
802     public void _printUserLink(CommandInterpreter ci) {
803         for (String name : this.userLinksDB.keySet()) {
804             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
805             ci.println("Name : " + name);
806             ci.println(linkConfig);
807             ci.println("Edge " + getLinkTuple(linkConfig));
808             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
809         }
810     }
811
812     public void _addUserLink(CommandInterpreter ci) {
813         String name = ci.nextArgument();
814         if ((name == null)) {
815             ci.println("Please enter a valid Name");
816             return;
817         }
818
819         String ncStr1 = ci.nextArgument();
820         if (ncStr1 == null) {
821             ci.println("Please enter two node connector strings");
822             return;
823         }
824         String ncStr2 = ci.nextArgument();
825         if (ncStr2 == null) {
826             ci.println("Please enter second node connector string");
827             return;
828         }
829
830         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
831         if (nc1 == null) {
832             ci.println("Invalid input node connector 1 string: " + ncStr1);
833             return;
834         }
835         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
836         if (nc2 == null) {
837             ci.println("Invalid input node connector 2 string: " + ncStr2);
838             return;
839         }
840
841         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
842         ci.println(this.addUserLink(config));
843     }
844
845     public void _deleteUserLink(CommandInterpreter ci) {
846         String name = ci.nextArgument();
847         if ((name == null)) {
848             ci.println("Please enter a valid Name");
849             return;
850         }
851         this.deleteUserLink(name);
852     }
853
854     public void _printNodeEdges(CommandInterpreter ci) {
855         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
856         if (nodeEdges == null) {
857             return;
858         }
859         Set<Node> nodeSet = nodeEdges.keySet();
860         if (nodeSet == null) {
861             return;
862         }
863         ci.println("        Node                                         Edge");
864         for (Node node : nodeSet) {
865             Set<Edge> edgeSet = nodeEdges.get(node);
866             if (edgeSet == null) {
867                 continue;
868             }
869             for (Edge edge : edgeSet) {
870                 ci.println(node + "             " + edge);
871             }
872         }
873     }
874
875     @Override
876     public Object readObject(ObjectInputStream ois)
877             throws FileNotFoundException, IOException, ClassNotFoundException {
878         return ois.readObject();
879     }
880
881     @Override
882     public Status saveConfiguration() {
883         return saveConfig();
884     }
885
886     @Override
887     public void edgeOverUtilized(Edge edge) {
888         log.warn("Link Utilization above normal: {}", edge);
889     }
890
891     @Override
892     public void edgeUtilBackToNormal(Edge edge) {
893         log.warn("Link Utilization back to normal: {}", edge);
894     }
895
896     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
897         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
898         upd.setLocal(isLocal);
899         notifyQ.add(upd);
900     }
901
902     @Override
903     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
904         if (cacheName.equals(TOPOEDGESDB)) {
905             // This is the case of an Edge being added to the topology DB
906             final Edge e = (Edge) key;
907             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
908             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
909         }
910     }
911
912     @Override
913     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
914         if (cacheName.equals(TOPOEDGESDB)) {
915             final Edge e = (Edge) key;
916             log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
917             final Set<Property> props = (Set<Property>) new_value;
918             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
919         }
920     }
921
922     @Override
923     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
924         if (cacheName.equals(TOPOEDGESDB)) {
925             final Edge e = (Edge) key;
926             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
927             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
928         }
929     }
930
931     class TopologyNotify implements Runnable {
932         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
933         private TopoEdgeUpdate entry;
934         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
935         private boolean notifyListeners;
936
937         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
938             this.notifyQ = notifyQ;
939         }
940
941         @Override
942         public void run() {
943             while (true) {
944                 try {
945                     log.trace("New run of TopologyNotify");
946                     notifyListeners = false;
947                     // First we block waiting for an element to get in
948                     entry = notifyQ.take();
949                     // Then we drain the whole queue if elements are
950                     // in it without getting into any blocking condition
951                     for (; entry != null; entry = notifyQ.poll()) {
952                         teuList.add(entry);
953                         notifyListeners = true;
954                     }
955
956                     // Notify listeners only if there were updates drained else
957                     // give up
958                     if (notifyListeners) {
959                         log.trace("Notifier thread, notified a listener");
960                         // Now update the listeners
961                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
962                             try {
963                                 s.edgeUpdate(teuList);
964                             } catch (Exception exc) {
965                                 log.error("Exception on edge update:", exc);
966                             }
967                         }
968                     }
969                     teuList.clear();
970
971                     // Lets sleep for sometime to allow aggregation of event
972                     Thread.sleep(100);
973                 } catch (InterruptedException e1) {
974                     if (shuttingDown) {
975                         return;
976                     }
977                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
978                 } catch (Exception e2) {
979                     log.error("", e2);
980                 }
981             }
982         }
983     }
984 }