Allow Topology DB to keep track of production switches
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.switchmanager.ISwitchManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
60 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
61 import org.osgi.framework.BundleContext;
62 import org.osgi.framework.FrameworkUtil;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 /**
67  * The class describes TopologyManager which is the central repository of the
68  * network topology. It provides service for applications to interact with
69  * topology database and notifies all the listeners of topology changes.
70  */
71 public class TopologyManagerImpl implements
72         ICacheUpdateAware<Object, Object>,
73         ITopologyManager,
74         IConfigurationContainerAware,
75         IListenTopoUpdates,
76         IObjectReader,
77         CommandProvider {
78     static final String TOPOEDGESDB = "topologymanager.edgesDB";
79     static final String TOPOHOSTSDB = "topologymanager.hostsDB";
80     static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
81     static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
82     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
83     private ITopologyService topoService;
84     private IClusterContainerServices clusterContainerService;
85     private ISwitchManager switchManager;
86     // DB of all the Edges with properties which constitute our topology
87     private ConcurrentMap<Edge, Set<Property>> edgesDB;
88     // DB of all NodeConnector which are part of ISL Edges, meaning they
89     // are connected to another NodeConnector on the other side of an ISL link.
90     // NodeConnector of a Production Edge is not part of this DB.
91     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
92     // DB of all the NodeConnectors with an Host attached to it
93     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
94     // Topology Manager Aware listeners
95     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
96     // Topology Manager Aware listeners - for clusterwide updates
97     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
98             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
99     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
100     private String userLinksFileName;
101     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
102     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
103     private volatile Boolean shuttingDown = false;
104     private Thread notifyThread;
105
106
107     void nonClusterObjectCreate() {
108         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
109         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
110         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
111         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
112     }
113
114     void setTopologyManagerAware(ITopologyManagerAware s) {
115         if (this.topologyManagerAware != null) {
116             log.debug("Adding ITopologyManagerAware: {}", s);
117             this.topologyManagerAware.add(s);
118         }
119     }
120
121     void unsetTopologyManagerAware(ITopologyManagerAware s) {
122         if (this.topologyManagerAware != null) {
123             log.debug("Removing ITopologyManagerAware: {}", s);
124             this.topologyManagerAware.remove(s);
125         }
126     }
127
128     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
129         if (this.topologyManagerClusterWideAware != null) {
130             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
131             this.topologyManagerClusterWideAware.add(s);
132         }
133     }
134
135     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
136         if (this.topologyManagerClusterWideAware != null) {
137             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
138             this.topologyManagerClusterWideAware.remove(s);
139         }
140     }
141
142     void setTopoService(ITopologyService s) {
143         log.debug("Adding ITopologyService: {}", s);
144         this.topoService = s;
145     }
146
147     void unsetTopoService(ITopologyService s) {
148         if (this.topoService == s) {
149             log.debug("Removing ITopologyService: {}", s);
150             this.topoService = null;
151         }
152     }
153
154     void setClusterContainerService(IClusterContainerServices s) {
155         log.debug("Cluster Service set");
156         this.clusterContainerService = s;
157     }
158
159     void unsetClusterContainerService(IClusterContainerServices s) {
160         if (this.clusterContainerService == s) {
161             log.debug("Cluster Service removed!");
162             this.clusterContainerService = null;
163         }
164     }
165
166     void setSwitchManager(ISwitchManager s) {
167         log.debug("Adding ISwitchManager: {}", s);
168         this.switchManager = s;
169     }
170
171     void unsetSwitchManager(ISwitchManager s) {
172         if (this.switchManager == s) {
173             log.debug("Removing ISwitchManager: {}", s);
174             this.switchManager = null;
175         }
176     }
177
178     /**
179      * Function called by the dependency manager when all the required
180      * dependencies are satisfied
181      *
182      */
183     void init(Component c) {
184         allocateCaches();
185         retrieveCaches();
186         String containerName = null;
187         Dictionary<?, ?> props = c.getServiceProperties();
188         if (props != null) {
189             containerName = (String) props.get("containerName");
190         } else {
191             // In the Global instance case the containerName is empty
192             containerName = "UNKNOWN";
193         }
194
195         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
196         registerWithOSGIConsole();
197         loadConfiguration();
198         // Restore the shuttingDown status on init of the component
199         shuttingDown = false;
200         notifyThread = new Thread(new TopologyNotify(notifyQ));
201     }
202
203     @SuppressWarnings({ "unchecked" })
204     private void allocateCaches() {
205         try {
206             this.edgesDB =
207                     (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
208                             EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
209         } catch (CacheExistException cee) {
210             log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
211         } catch (CacheConfigException cce) {
212             log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
213         }
214
215         try {
216             this.hostsDB =
217                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
218                             TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
219         } catch (CacheExistException cee) {
220             log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
221         } catch (CacheConfigException cce) {
222             log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
223         }
224
225         try {
226             this.nodeConnectorsDB =
227                     (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
228                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
229         } catch (CacheExistException cee) {
230             log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
231         } catch (CacheConfigException cce) {
232             log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
233         }
234
235         try {
236             this.userLinksDB =
237                     (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
238                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
239         } catch (CacheExistException cee) {
240             log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
241         } catch (CacheConfigException cce) {
242             log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
243         }
244     }
245
246     @SuppressWarnings({ "unchecked" })
247     private void retrieveCaches() {
248         if (this.clusterContainerService == null) {
249             log.error("Cluster Services is null, can't retrieve caches.");
250             return;
251         }
252
253         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
254         if (edgesDB == null) {
255             log.error("Failed to get cache for " + TOPOEDGESDB);
256         }
257
258         this.hostsDB =
259                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
260         if (hostsDB == null) {
261             log.error("Failed to get cache for " + TOPOHOSTSDB);
262         }
263
264         this.nodeConnectorsDB =
265                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
266         if (nodeConnectorsDB == null) {
267             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
268         }
269
270         this.userLinksDB =
271                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
272         if (userLinksDB == null) {
273             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
274         }
275     }
276
277     /**
278      * Function called after the topology manager has registered the service in
279      * OSGi service registry.
280      *
281      */
282     void started() {
283         // Start the batcher thread for the cluster wide topology updates
284         notifyThread.start();
285         // SollicitRefresh MUST be called here else if called at init
286         // time it may sollicit refresh too soon.
287         log.debug("Sollicit topology refresh");
288         topoService.sollicitRefresh();
289     }
290
291     void stop() {
292         shuttingDown = true;
293         notifyThread.interrupt();
294     }
295
296     /**
297      * Function called by the dependency manager when at least one dependency
298      * become unsatisfied or when the component is shutting down because for
299      * example bundle is being stopped.
300      *
301      */
302     void destroy() {
303         notifyQ.clear();
304         notifyThread = null;
305     }
306
307     @SuppressWarnings("unchecked")
308     private void loadConfiguration() {
309         ObjectReader objReader = new ObjectReader();
310         ConcurrentMap<String, TopologyUserLinkConfig> confList =
311                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
312
313         if (confList != null) {
314             for (TopologyUserLinkConfig conf : confList.values()) {
315                 addUserLink(conf);
316             }
317         }
318     }
319
320     @Override
321     public Status saveConfig() {
322         return saveConfigInternal();
323     }
324
325     public Status saveConfigInternal() {
326         ObjectWriter objWriter = new ObjectWriter();
327
328         Status saveStatus = objWriter.write(
329                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
330
331         if (! saveStatus.isSuccess()) {
332             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
333         }
334         return saveStatus;
335     }
336
337     @Override
338     public Map<Node, Set<Edge>> getNodeEdges() {
339         if (this.edgesDB == null) {
340             return null;
341         }
342
343         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
344         for (Edge edge : this.edgesDB.keySet()) {
345             // Lets analyze the tail
346             Node node = edge.getTailNodeConnector().getNode();
347             Set<Edge> nodeEdges = res.get(node);
348             if (nodeEdges == null) {
349                 nodeEdges = new HashSet<Edge>();
350                 res.put(node, nodeEdges);
351             }
352             nodeEdges.add(edge);
353
354             // Lets analyze the head
355             node = edge.getHeadNodeConnector().getNode();
356             nodeEdges = res.get(node);
357             if (nodeEdges == null) {
358                 nodeEdges = new HashSet<Edge>();
359                 res.put(node, nodeEdges);
360             }
361             nodeEdges.add(edge);
362         }
363
364         return res;
365     }
366
367     @Override
368     public boolean isInternal(NodeConnector p) {
369         if (this.nodeConnectorsDB == null) {
370             return false;
371         }
372
373         // This is an internal NodeConnector if is connected to
374         // another Node i.e it's part of the nodeConnectorsDB
375         return (this.nodeConnectorsDB.get(p) != null);
376     }
377
378     /**
379      * This method returns true if the edge is an ISL link.
380      *
381      * @param e
382      *            The edge
383      * @return true if it is an ISL link
384      */
385     public boolean isISLink(Edge e) {
386         return (!isProductionLink(e));
387     }
388
389     /**
390      * This method returns true if the edge is a production link.
391      *
392      * @param e
393      *            The edge
394      * @return true if it is a production link
395      */
396     public boolean isProductionLink(Edge e) {
397         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
398                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
399     }
400
401     /**
402      * The Map returned is a copy of the current topology hence if the topology
403      * changes the copy doesn't
404      *
405      * @return A Map representing the current topology expressed as edges of the
406      *         network
407      */
408     @Override
409     public Map<Edge, Set<Property>> getEdges() {
410         if (this.edgesDB == null) {
411             return null;
412         }
413
414         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
415         Set<Property> props;
416         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
417             // Sets of props are copied because the composition of
418             // those properties could change with time
419             props = new HashSet<Property>(edgeEntry.getValue());
420             // We can simply reuse the key because the object is
421             // immutable so doesn't really matter that we are
422             // referencing the only owned by a different table, the
423             // meaning is the same because doesn't change with time.
424             edgeMap.put(edgeEntry.getKey(), props);
425         }
426
427         return edgeMap;
428     }
429
430     @Override
431     public Set<NodeConnector> getNodeConnectorWithHost() {
432         if (this.hostsDB == null) {
433             return null;
434         }
435
436         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
437     }
438
439     @Override
440     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
441         if (this.hostsDB == null) {
442             return null;
443         }
444         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
445         Node node;
446         Set<NodeConnector> portSet;
447         for (NodeConnector nc : this.hostsDB.keySet()) {
448             node = nc.getNode();
449             portSet = res.get(node);
450             if (portSet == null) {
451                 // Create the HashSet if null
452                 portSet = new HashSet<NodeConnector>();
453                 res.put(node, portSet);
454             }
455
456             // Keep updating the HashSet, given this is not a
457             // clustered map we can just update the set without
458             // worrying to update the hashmap.
459             portSet.add(nc);
460         }
461
462         return (res);
463     }
464
465     @Override
466     public Host getHostAttachedToNodeConnector(NodeConnector port) {
467         List<Host> hosts = getHostsAttachedToNodeConnector(port);
468         if(hosts != null && !hosts.isEmpty()){
469             return hosts.get(0);
470         }
471         return null;
472     }
473
474     @Override
475     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
476         Set<ImmutablePair<Host, Set<Property>>> hosts;
477         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
478             return null;
479         }
480         // create a list of hosts
481         List<Host> retHosts = new LinkedList<Host>();
482         for(ImmutablePair<Host, Set<Property>> host : hosts) {
483             retHosts.add(host.getLeft());
484         }
485         return retHosts;
486     }
487
488     @Override
489     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
490
491         // Clone the property set in case non null else just
492         // create an empty one. Caches allocated via infinispan
493         // don't allow null values
494         if (props == null) {
495             props = new HashSet<Property>();
496         } else {
497             props = new HashSet<Property>(props);
498         }
499         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
500
501         // get the host list
502         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
503         if(hostSet == null) {
504             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
505         }
506         switch (t) {
507         case ADDED:
508         case CHANGED:
509             hostSet.add(thisHost);
510             this.hostsDB.put(port, hostSet);
511             break;
512         case REMOVED:
513             hostSet.remove(thisHost);
514             if(hostSet.isEmpty()) {
515                 //remove only if hasn't been concurrently modified
516                 this.hostsDB.remove(port, hostSet);
517             } else {
518                 this.hostsDB.put(port, hostSet);
519             }
520             break;
521         }
522     }
523
524     private boolean headNodeConnectorExist(Edge e) {
525         /*
526          * Only check the head end point which is supposed to be part of a
527          * network node we control (present in our inventory). If we checked the
528          * tail end point as well, we would not store the edges that connect to
529          * a non sdn enable port on a non sdn capable production switch. We want
530          * to be able to see these switches on the topology.
531          */
532         NodeConnector head = e.getHeadNodeConnector();
533         return (switchManager.doesNodeConnectorExist(head));
534     }
535
536     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
537         switch (type) {
538         case ADDED:
539             // Avoid redundant update as notifications trigger expensive tasks
540             if (edgesDB.containsKey(e)) {
541                 log.trace("Skipping redundant edge addition: {}", e);
542                 return null;
543             }
544
545             // Ensure that head node connector exists
546             if (!headNodeConnectorExist(e)) {
547                 log.warn("Ignore edge that contains invalid node connector: {}", e);
548                 return null;
549             }
550
551             // Make sure the props are non-null
552             if (props == null) {
553                 props = new HashSet<Property>();
554             } else {
555                 props = new HashSet<Property>(props);
556             }
557
558             //in case of node switch-over to a different cluster controller,
559             //let's retain edge props
560             Set<Property> currentProps = this.edgesDB.get(e);
561             if (currentProps != null){
562                 props.addAll(currentProps);
563             }
564
565             // Now make sure there is the creation timestamp for the
566             // edge, if not there, stamp with the first update
567             boolean found_create = false;
568             for (Property prop : props) {
569                 if (prop instanceof TimeStamp) {
570                     TimeStamp t = (TimeStamp) prop;
571                     if (t.getTimeStampName().equals("creation")) {
572                         found_create = true;
573                         break;
574                     }
575                 }
576             }
577
578             if (!found_create) {
579                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
580                 props.add(t);
581             }
582
583             // Now add this in the database eventually overriding
584             // something that may have been already existing
585             this.edgesDB.put(e, props);
586
587             // Now populate the DB of NodeConnectors
588             // NOTE WELL: properties are empty sets, not really needed
589             // for now.
590             // The DB only contains ISL ports
591             if (isISLink(e)) {
592                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
593                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
594             }
595             log.trace("Edge {}  {}", e.toString(), type.name());
596             break;
597         case REMOVED:
598             // Now remove the edge from edgesDB
599             this.edgesDB.remove(e);
600
601             // Now lets update the NodeConnectors DB, the assumption
602             // here is that two NodeConnector are exclusively
603             // connected by 1 and only 1 edge, this is reasonable in
604             // the same plug (virtual of phisical) we can assume two
605             // cables won't be plugged. This could break only in case
606             // of devices in the middle that acts as hubs, but it
607             // should be safe to assume that won't happen.
608             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
609             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
610             log.trace("Edge {}  {}", e.toString(), type.name());
611             break;
612         case CHANGED:
613             Set<Property> oldProps = this.edgesDB.get(e);
614
615             // When property changes lets make sure we can change it
616             // all except the creation time stamp because that should
617             // be changed only when the edge is destroyed and created
618             // again
619             TimeStamp timeStamp = null;
620             for (Property prop : oldProps) {
621                 if (prop instanceof TimeStamp) {
622                     TimeStamp tsProp = (TimeStamp) prop;
623                     if (tsProp.getTimeStampName().equals("creation")) {
624                         timeStamp = tsProp;
625                         break;
626                     }
627                 }
628             }
629
630             // Now lets make sure new properties are non-null
631             if (props == null) {
632                 props = new HashSet<Property>();
633             } else {
634                 // Copy the set so noone is going to change the content
635                 props = new HashSet<Property>(props);
636             }
637
638             // Now lets remove the creation property if exist in the
639             // new props
640             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
641                 Property prop = i.next();
642                 if (prop instanceof TimeStamp) {
643                     TimeStamp t = (TimeStamp) prop;
644                     if (t.getTimeStampName().equals("creation")) {
645                         i.remove();
646                         break;
647                     }
648                 }
649             }
650
651             // Now lets add the creation timestamp in it
652             if (timeStamp != null) {
653                 props.add(timeStamp);
654             }
655
656             // Finally update
657             this.edgesDB.put(e, props);
658             log.trace("Edge {}  {}", e.toString(), type.name());
659             break;
660         }
661         return new TopoEdgeUpdate(e, props, type);
662     }
663
664     @Override
665     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
666         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
667         for (int i = 0; i < topoedgeupdateList.size(); i++) {
668             Edge e = topoedgeupdateList.get(i).getEdge();
669             Set<Property> p = topoedgeupdateList.get(i).getProperty();
670             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
671             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
672             if (teu != null) {
673                 teuList.add(teu);
674             }
675         }
676
677         if (!teuList.isEmpty()) {
678             // Now update the listeners
679             for (ITopologyManagerAware s : this.topologyManagerAware) {
680                 try {
681                     s.edgeUpdate(teuList);
682                 } catch (Exception exc) {
683                     log.error("Exception on edge update:", exc);
684                 }
685             }
686         }
687     }
688
689     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
690         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
691                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
692         return getLinkTuple(rLink);
693     }
694
695
696     private Edge getLinkTuple(TopologyUserLinkConfig link) {
697         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
698         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
699         try {
700             return new Edge(srcNodeConnector, dstNodeConnector);
701         } catch (Exception e) {
702             return null;
703         }
704     }
705
706     @Override
707     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
708         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
709     }
710
711     @Override
712     public Status addUserLink(TopologyUserLinkConfig userLink) {
713         if (!userLink.isValid()) {
714             return new Status(StatusCode.BADREQUEST,
715                     "User link configuration invalid.");
716         }
717         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
718
719         //Check if this link already configured
720         //NOTE: infinispan cache doesn't support Map.containsValue()
721         // (which is linear time in most ConcurrentMap impl anyway)
722         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
723             if (existingLink.equals(userLink)) {
724                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
725             }
726         }
727         //attempt put, if mapping for this key already existed return conflict
728         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
729             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
730                     + " already exists. Please use another name");
731         }
732
733         Edge linkTuple = getLinkTuple(userLink);
734         if (linkTuple != null) {
735             if (!isProductionLink(linkTuple)) {
736                 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
737                                                 new HashSet<Property>());
738                 if (teu == null) {
739                     userLinksDB.remove(userLink.getName());
740                     return new Status(StatusCode.NOTFOUND,
741                            "Link configuration contains invalid node connector: "
742                            + userLink);
743                 }
744             }
745
746             linkTuple = getReverseLinkTuple(userLink);
747             if (linkTuple != null) {
748                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
749                 if (!isProductionLink(linkTuple)) {
750                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
751                 }
752             }
753         }
754         return new Status(StatusCode.SUCCESS);
755     }
756
757     @Override
758     public Status deleteUserLink(String linkName) {
759         if (linkName == null) {
760             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
761         }
762
763         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
764         Edge linkTuple;
765         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
766             if (! isProductionLink(linkTuple)) {
767                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
768             }
769
770             linkTuple = getReverseLinkTuple(link);
771             if (! isProductionLink(linkTuple)) {
772                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
773             }
774         }
775         return new Status(StatusCode.SUCCESS);
776     }
777
778     private void registerWithOSGIConsole() {
779         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
780                 .getBundleContext();
781         bundleContext.registerService(CommandProvider.class.getName(), this,
782                 null);
783     }
784
785     @Override
786     public String getHelp() {
787         StringBuffer help = new StringBuffer();
788         help.append("---Topology Manager---\n");
789         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
790         help.append("\t deleteUserLink <name>\n");
791         help.append("\t printUserLink\n");
792         help.append("\t printNodeEdges\n");
793         return help.toString();
794     }
795
796     public void _printUserLink(CommandInterpreter ci) {
797         for (String name : this.userLinksDB.keySet()) {
798             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
799             ci.println("Name : " + name);
800             ci.println(linkConfig);
801             ci.println("Edge " + getLinkTuple(linkConfig));
802             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
803         }
804     }
805
806     public void _addUserLink(CommandInterpreter ci) {
807         String name = ci.nextArgument();
808         if ((name == null)) {
809             ci.println("Please enter a valid Name");
810             return;
811         }
812
813         String ncStr1 = ci.nextArgument();
814         if (ncStr1 == null) {
815             ci.println("Please enter two node connector strings");
816             return;
817         }
818         String ncStr2 = ci.nextArgument();
819         if (ncStr2 == null) {
820             ci.println("Please enter second node connector string");
821             return;
822         }
823
824         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
825         if (nc1 == null) {
826             ci.println("Invalid input node connector 1 string: " + ncStr1);
827             return;
828         }
829         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
830         if (nc2 == null) {
831             ci.println("Invalid input node connector 2 string: " + ncStr2);
832             return;
833         }
834
835         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
836         ci.println(this.addUserLink(config));
837     }
838
839     public void _deleteUserLink(CommandInterpreter ci) {
840         String name = ci.nextArgument();
841         if ((name == null)) {
842             ci.println("Please enter a valid Name");
843             return;
844         }
845         this.deleteUserLink(name);
846     }
847
848     public void _printNodeEdges(CommandInterpreter ci) {
849         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
850         if (nodeEdges == null) {
851             return;
852         }
853         Set<Node> nodeSet = nodeEdges.keySet();
854         if (nodeSet == null) {
855             return;
856         }
857         ci.println("        Node                                         Edge");
858         for (Node node : nodeSet) {
859             Set<Edge> edgeSet = nodeEdges.get(node);
860             if (edgeSet == null) {
861                 continue;
862             }
863             for (Edge edge : edgeSet) {
864                 ci.println(node + "             " + edge);
865             }
866         }
867     }
868
869     @Override
870     public Object readObject(ObjectInputStream ois)
871             throws FileNotFoundException, IOException, ClassNotFoundException {
872         return ois.readObject();
873     }
874
875     @Override
876     public Status saveConfiguration() {
877         return saveConfig();
878     }
879
880     @Override
881     public void edgeOverUtilized(Edge edge) {
882         log.warn("Link Utilization above normal: {}", edge);
883     }
884
885     @Override
886     public void edgeUtilBackToNormal(Edge edge) {
887         log.warn("Link Utilization back to normal: {}", edge);
888     }
889
890     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
891         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
892         upd.setLocal(isLocal);
893         notifyQ.add(upd);
894     }
895
896     @Override
897     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
898         if (cacheName.equals(TOPOEDGESDB)) {
899             // This is the case of an Edge being added to the topology DB
900             final Edge e = (Edge) key;
901             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
902             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
903         }
904     }
905
906     @Override
907     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
908         if (cacheName.equals(TOPOEDGESDB)) {
909             final Edge e = (Edge) key;
910             log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
911             final Set<Property> props = (Set<Property>) new_value;
912             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
913         }
914     }
915
916     @Override
917     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
918         if (cacheName.equals(TOPOEDGESDB)) {
919             final Edge e = (Edge) key;
920             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
921             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
922         }
923     }
924
925     class TopologyNotify implements Runnable {
926         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
927         private TopoEdgeUpdate entry;
928         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
929         private boolean notifyListeners;
930
931         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
932             this.notifyQ = notifyQ;
933         }
934
935         @Override
936         public void run() {
937             while (true) {
938                 try {
939                     log.trace("New run of TopologyNotify");
940                     notifyListeners = false;
941                     // First we block waiting for an element to get in
942                     entry = notifyQ.take();
943                     // Then we drain the whole queue if elements are
944                     // in it without getting into any blocking condition
945                     for (; entry != null; entry = notifyQ.poll()) {
946                         teuList.add(entry);
947                         notifyListeners = true;
948                     }
949
950                     // Notify listeners only if there were updates drained else
951                     // give up
952                     if (notifyListeners) {
953                         log.trace("Notifier thread, notified a listener");
954                         // Now update the listeners
955                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
956                             try {
957                                 s.edgeUpdate(teuList);
958                             } catch (Exception exc) {
959                                 log.error("Exception on edge update:", exc);
960                             }
961                         }
962                     }
963                     teuList.clear();
964
965                     // Lets sleep for sometime to allow aggregation of event
966                     Thread.sleep(100);
967                 } catch (InterruptedException e1) {
968                     if (shuttingDown) {
969                         return;
970                     }
971                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
972                 } catch (Exception e2) {
973                     log.error("", e2);
974                 }
975             }
976         }
977     }
978 }