Merge "Remove unused class member SAVE"
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.topologymanager.ITopologyManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
59 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * The class describes TopologyManager which is the central repository of the
67  * network topology. It provides service for applications to interact with
68  * topology database and notifies all the listeners of topology changes.
69  */
70 public class TopologyManagerImpl implements
71         ICacheUpdateAware,
72         ITopologyManager,
73         IConfigurationContainerAware,
74         IListenTopoUpdates,
75         IObjectReader,
76         CommandProvider {
77     static final String TOPOEDGESDB = "topologymanager.edgesDB";
78     static final String TOPOHOSTSDB = "topologymanager.hostsDB";
79     static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
80     static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
81     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
82     private ITopologyService topoService;
83     private IClusterContainerServices clusterContainerService;
84     // DB of all the Edges with properties which constitute our topology
85     private ConcurrentMap<Edge, Set<Property>> edgesDB;
86     // DB of all NodeConnector which are part of ISL Edges, meaning they
87     // are connected to another NodeConnector on the other side of an ISL link.
88     // NodeConnector of a Production Edge is not part of this DB.
89     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
90     // DB of all the NodeConnectors with an Host attached to it
91     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
92     // Topology Manager Aware listeners
93     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
94     // Topology Manager Aware listeners - for clusterwide updates
95     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
96             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
97     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
98     private String userLinksFileName;
99     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
100     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
101     private volatile Boolean shuttingDown = false;
102     private Thread notifyThread;
103
104
105     void nonClusterObjectCreate() {
106         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
107         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
108         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
109         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
110     }
111
112     void setTopologyManagerAware(ITopologyManagerAware s) {
113         if (this.topologyManagerAware != null) {
114             log.debug("Adding ITopologyManagerAware: {}", s);
115             this.topologyManagerAware.add(s);
116         }
117     }
118
119     void unsetTopologyManagerAware(ITopologyManagerAware s) {
120         if (this.topologyManagerAware != null) {
121             log.debug("Removing ITopologyManagerAware: {}", s);
122             this.topologyManagerAware.remove(s);
123         }
124     }
125
126     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
127         if (this.topologyManagerClusterWideAware != null) {
128             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
129             this.topologyManagerClusterWideAware.add(s);
130         }
131     }
132
133     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
134         if (this.topologyManagerClusterWideAware != null) {
135             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
136             this.topologyManagerClusterWideAware.remove(s);
137         }
138     }
139
140     void setTopoService(ITopologyService s) {
141         log.debug("Adding ITopologyService: {}", s);
142         this.topoService = s;
143     }
144
145     void unsetTopoService(ITopologyService s) {
146         if (this.topoService == s) {
147             log.debug("Removing ITopologyService: {}", s);
148             this.topoService = null;
149         }
150     }
151
152     void setClusterContainerService(IClusterContainerServices s) {
153         log.debug("Cluster Service set");
154         this.clusterContainerService = s;
155     }
156
157     void unsetClusterContainerService(IClusterContainerServices s) {
158         if (this.clusterContainerService == s) {
159             log.debug("Cluster Service removed!");
160             this.clusterContainerService = null;
161         }
162     }
163
164     /**
165      * Function called by the dependency manager when all the required
166      * dependencies are satisfied
167      *
168      */
169     void init(Component c) {
170         allocateCaches();
171         retrieveCaches();
172         String containerName = null;
173         Dictionary<?, ?> props = c.getServiceProperties();
174         if (props != null) {
175             containerName = (String) props.get("containerName");
176         } else {
177             // In the Global instance case the containerName is empty
178             containerName = "UNKNOWN";
179         }
180
181         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
182         registerWithOSGIConsole();
183         loadConfiguration();
184         // Restore the shuttingDown status on init of the component
185         shuttingDown = false;
186         notifyThread = new Thread(new TopologyNotify(notifyQ));
187     }
188
189     @SuppressWarnings({ "unchecked", "deprecation" })
190     private void allocateCaches() {
191         try {
192             this.edgesDB =
193                     (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
194                             EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
195         } catch (CacheExistException cee) {
196             log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
197         } catch (CacheConfigException cce) {
198             log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
199         }
200
201         try {
202             this.hostsDB =
203                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
204                             TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
205         } catch (CacheExistException cee) {
206             log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
207         } catch (CacheConfigException cce) {
208             log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
209         }
210
211         try {
212             this.nodeConnectorsDB =
213                     (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
214                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
215         } catch (CacheExistException cee) {
216             log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
217         } catch (CacheConfigException cce) {
218             log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
219         }
220
221         try {
222             this.userLinksDB =
223                     (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
224                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
225         } catch (CacheExistException cee) {
226             log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
227         } catch (CacheConfigException cce) {
228             log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
229         }
230     }
231
232     @SuppressWarnings({ "unchecked", "deprecation" })
233     private void retrieveCaches() {
234         if (this.clusterContainerService == null) {
235             log.error("Cluster Services is null, can't retrieve caches.");
236             return;
237         }
238
239         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
240         if (edgesDB == null) {
241             log.error("Failed to get cache for " + TOPOEDGESDB);
242         }
243
244         this.hostsDB =
245                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
246         if (hostsDB == null) {
247             log.error("Failed to get cache for " + TOPOHOSTSDB);
248         }
249
250         this.nodeConnectorsDB =
251                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
252         if (nodeConnectorsDB == null) {
253             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
254         }
255
256         this.userLinksDB =
257                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
258         if (userLinksDB == null) {
259             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
260         }
261     }
262
263     /**
264      * Function called after the topology manager has registered the service in
265      * OSGi service registry.
266      *
267      */
268     void started() {
269         // Start the batcher thread for the cluster wide topology updates
270         notifyThread.start();
271         // SollicitRefresh MUST be called here else if called at init
272         // time it may sollicit refresh too soon.
273         log.debug("Sollicit topology refresh");
274         topoService.sollicitRefresh();
275     }
276
277     void stop() {
278         shuttingDown = true;
279         notifyThread.interrupt();
280     }
281
282     /**
283      * Function called by the dependency manager when at least one dependency
284      * become unsatisfied or when the component is shutting down because for
285      * example bundle is being stopped.
286      *
287      */
288     void destroy() {
289         notifyQ.clear();
290         notifyThread = null;
291     }
292
293     @SuppressWarnings("unchecked")
294     private void loadConfiguration() {
295         ObjectReader objReader = new ObjectReader();
296         ConcurrentMap<String, TopologyUserLinkConfig> confList =
297                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
298
299         if (confList != null) {
300             for (TopologyUserLinkConfig conf : confList.values()) {
301                 addUserLink(conf);
302             }
303         }
304     }
305
306     @Override
307     public Status saveConfig() {
308         return saveConfigInternal();
309     }
310
311     public Status saveConfigInternal() {
312         ObjectWriter objWriter = new ObjectWriter();
313
314         Status saveStatus = objWriter.write(
315                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
316
317         if (! saveStatus.isSuccess()) {
318             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
319         }
320         return saveStatus;
321     }
322
323     @Override
324     public Map<Node, Set<Edge>> getNodeEdges() {
325         if (this.edgesDB == null) {
326             return null;
327         }
328
329         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
330         for (Edge edge : this.edgesDB.keySet()) {
331             // Lets analyze the tail
332             Node node = edge.getTailNodeConnector().getNode();
333             Set<Edge> nodeEdges = res.get(node);
334             if (nodeEdges == null) {
335                 nodeEdges = new HashSet<Edge>();
336                 res.put(node, nodeEdges);
337             }
338             nodeEdges.add(edge);
339
340             // Lets analyze the head
341             node = edge.getHeadNodeConnector().getNode();
342             nodeEdges = res.get(node);
343             if (nodeEdges == null) {
344                 nodeEdges = new HashSet<Edge>();
345                 res.put(node, nodeEdges);
346             }
347             nodeEdges.add(edge);
348         }
349
350         return res;
351     }
352
353     @Override
354     public boolean isInternal(NodeConnector p) {
355         if (this.nodeConnectorsDB == null) {
356             return false;
357         }
358
359         // This is an internal NodeConnector if is connected to
360         // another Node i.e it's part of the nodeConnectorsDB
361         return (this.nodeConnectorsDB.get(p) != null);
362     }
363
364     /**
365      * This method returns true if the edge is an ISL link.
366      *
367      * @param e
368      *            The edge
369      * @return true if it is an ISL link
370      */
371     public boolean isISLink(Edge e) {
372         return (!isProductionLink(e));
373     }
374
375     /**
376      * This method returns true if the edge is a production link.
377      *
378      * @param e
379      *            The edge
380      * @return true if it is a production link
381      */
382     public boolean isProductionLink(Edge e) {
383         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
384                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
385     }
386
387     /**
388      * The Map returned is a copy of the current topology hence if the topology
389      * changes the copy doesn't
390      *
391      * @return A Map representing the current topology expressed as edges of the
392      *         network
393      */
394     @Override
395     public Map<Edge, Set<Property>> getEdges() {
396         if (this.edgesDB == null) {
397             return null;
398         }
399
400         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
401         Set<Property> props;
402         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
403             // Sets of props are copied because the composition of
404             // those properties could change with time
405             props = new HashSet<Property>(edgeEntry.getValue());
406             // We can simply reuse the key because the object is
407             // immutable so doesn't really matter that we are
408             // referencing the only owned by a different table, the
409             // meaning is the same because doesn't change with time.
410             edgeMap.put(edgeEntry.getKey(), props);
411         }
412
413         return edgeMap;
414     }
415
416     @Override
417     public Set<NodeConnector> getNodeConnectorWithHost() {
418         if (this.hostsDB == null) {
419             return null;
420         }
421
422         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
423     }
424
425     @Override
426     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
427         if (this.hostsDB == null) {
428             return null;
429         }
430         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
431         Node node;
432         Set<NodeConnector> portSet;
433         for (NodeConnector nc : this.hostsDB.keySet()) {
434             node = nc.getNode();
435             portSet = res.get(node);
436             if (portSet == null) {
437                 // Create the HashSet if null
438                 portSet = new HashSet<NodeConnector>();
439                 res.put(node, portSet);
440             }
441
442             // Keep updating the HashSet, given this is not a
443             // clustered map we can just update the set without
444             // worrying to update the hashmap.
445             portSet.add(nc);
446         }
447
448         return (res);
449     }
450
451     @Override
452     public Host getHostAttachedToNodeConnector(NodeConnector port) {
453         List<Host> hosts = getHostsAttachedToNodeConnector(port);
454         if(hosts != null && !hosts.isEmpty()){
455             return hosts.get(0);
456         }
457         return null;
458     }
459
460     @Override
461     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
462         Set<ImmutablePair<Host, Set<Property>>> hosts;
463         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
464             return null;
465         }
466         // create a list of hosts
467         List<Host> retHosts = new LinkedList<Host>();
468         for(ImmutablePair<Host, Set<Property>> host : hosts) {
469             retHosts.add(host.getLeft());
470         }
471         return retHosts;
472     }
473
474     @Override
475     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
476
477         // Clone the property set in case non null else just
478         // create an empty one. Caches allocated via infinispan
479         // don't allow null values
480         if (props == null) {
481             props = new HashSet<Property>();
482         } else {
483             props = new HashSet<Property>(props);
484         }
485         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
486
487         // get the host list
488         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
489         if(hostSet == null) {
490             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
491         }
492         switch (t) {
493         case ADDED:
494         case CHANGED:
495             hostSet.add(thisHost);
496             this.hostsDB.put(port, hostSet);
497             break;
498         case REMOVED:
499             hostSet.remove(thisHost);
500             if(hostSet.isEmpty()) {
501                 //remove only if hasn't been concurrently modified
502                 this.hostsDB.remove(port, hostSet);
503             } else {
504                 this.hostsDB.put(port, hostSet);
505             }
506             break;
507         }
508     }
509
510     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
511         switch (type) {
512         case ADDED:
513             // Make sure the props are non-null
514             if (props == null) {
515                 props = new HashSet<Property>();
516             } else {
517                 props = new HashSet<Property>(props);
518             }
519
520             //in case of node switch-over to a different cluster controller,
521             //let's retain edge props
522             Set<Property> currentProps = this.edgesDB.get(e);
523             if (currentProps != null){
524                 props.addAll(currentProps);
525             }
526
527             // Now make sure there is the creation timestamp for the
528             // edge, if not there, stamp with the first update
529             boolean found_create = false;
530             for (Property prop : props) {
531                 if (prop instanceof TimeStamp) {
532                     TimeStamp t = (TimeStamp) prop;
533                     if (t.getTimeStampName().equals("creation")) {
534                         found_create = true;
535                         break;
536                     }
537                 }
538             }
539
540             if (!found_create) {
541                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
542                 props.add(t);
543             }
544
545             // Now add this in the database eventually overriding
546             // something that may have been already existing
547             this.edgesDB.put(e, props);
548
549             // Now populate the DB of NodeConnectors
550             // NOTE WELL: properties are empty sets, not really needed
551             // for now.
552             // The DB only contains ISL ports
553             if (isISLink(e)) {
554                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
555                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
556             }
557             log.trace("Edge {}  {}", e.toString(), type.name());
558             break;
559         case REMOVED:
560             // Now remove the edge from edgesDB
561             this.edgesDB.remove(e);
562
563             // Now lets update the NodeConnectors DB, the assumption
564             // here is that two NodeConnector are exclusively
565             // connected by 1 and only 1 edge, this is reasonable in
566             // the same plug (virtual of phisical) we can assume two
567             // cables won't be plugged. This could break only in case
568             // of devices in the middle that acts as hubs, but it
569             // should be safe to assume that won't happen.
570             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
571             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
572             log.trace("Edge {}  {}", e.toString(), type.name());
573             break;
574         case CHANGED:
575             Set<Property> oldProps = this.edgesDB.get(e);
576
577             // When property changes lets make sure we can change it
578             // all except the creation time stamp because that should
579             // be changed only when the edge is destroyed and created
580             // again
581             TimeStamp timeStamp = null;
582             for (Property prop : oldProps) {
583                 if (prop instanceof TimeStamp) {
584                     TimeStamp tsProp = (TimeStamp) prop;
585                     if (tsProp.getTimeStampName().equals("creation")) {
586                         timeStamp = tsProp;
587                         break;
588                     }
589                 }
590             }
591
592             // Now lets make sure new properties are non-null
593             if (props == null) {
594                 props = new HashSet<Property>();
595             } else {
596                 // Copy the set so noone is going to change the content
597                 props = new HashSet<Property>(props);
598             }
599
600             // Now lets remove the creation property if exist in the
601             // new props
602             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
603                 Property prop = i.next();
604                 if (prop instanceof TimeStamp) {
605                     TimeStamp t = (TimeStamp) prop;
606                     if (t.getTimeStampName().equals("creation")) {
607                         i.remove();
608                         break;
609                     }
610                 }
611             }
612
613             // Now lets add the creation timestamp in it
614             if (timeStamp != null) {
615                 props.add(timeStamp);
616             }
617
618             // Finally update
619             this.edgesDB.put(e, props);
620             log.trace("Edge {}  {}", e.toString(), type.name());
621             break;
622         }
623         return new TopoEdgeUpdate(e, props, type);
624     }
625
626     @Override
627     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
628         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
629         for (int i = 0; i < topoedgeupdateList.size(); i++) {
630             Edge e = topoedgeupdateList.get(i).getEdge();
631             Set<Property> p = topoedgeupdateList.get(i).getProperty();
632             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
633             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
634             teuList.add(teu);
635         }
636
637         // Now update the listeners
638         for (ITopologyManagerAware s : this.topologyManagerAware) {
639             try {
640                 s.edgeUpdate(teuList);
641             } catch (Exception exc) {
642                 log.error("Exception on edge update:", exc);
643             }
644         }
645
646     }
647
648     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
649         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
650                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
651         return getLinkTuple(rLink);
652     }
653
654
655     private Edge getLinkTuple(TopologyUserLinkConfig link) {
656         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
657         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
658         try {
659             return new Edge(srcNodeConnector, dstNodeConnector);
660         } catch (Exception e) {
661             return null;
662         }
663     }
664
665     @Override
666     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
667         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
668     }
669
670     @Override
671     public Status addUserLink(TopologyUserLinkConfig userLink) {
672         if (!userLink.isValid()) {
673             return new Status(StatusCode.BADREQUEST,
674                     "User link configuration invalid.");
675         }
676         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
677
678         //Check if this link already configured
679         //NOTE: infinispan cache doesn't support Map.containsValue()
680         // (which is linear time in most ConcurrentMap impl anyway)
681         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
682             if (existingLink.equals(userLink)) {
683                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
684             }
685         }
686         //attempt put, if mapping for this key already existed return conflict
687         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
688             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
689                     + " already exists. Please use another name");
690         }
691
692         Edge linkTuple = getLinkTuple(userLink);
693         if (linkTuple != null) {
694             if (!isProductionLink(linkTuple)) {
695                 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
696             }
697
698             linkTuple = getReverseLinkTuple(userLink);
699             if (linkTuple != null) {
700                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
701                 if (!isProductionLink(linkTuple)) {
702                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
703                 }
704             }
705         }
706         return new Status(StatusCode.SUCCESS);
707     }
708
709     @Override
710     public Status deleteUserLink(String linkName) {
711         if (linkName == null) {
712             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
713         }
714
715         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
716         Edge linkTuple;
717         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
718             if (! isProductionLink(linkTuple)) {
719                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
720             }
721
722             linkTuple = getReverseLinkTuple(link);
723             if (! isProductionLink(linkTuple)) {
724                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
725             }
726         }
727         return new Status(StatusCode.SUCCESS);
728     }
729
730     private void registerWithOSGIConsole() {
731         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
732                 .getBundleContext();
733         bundleContext.registerService(CommandProvider.class.getName(), this,
734                 null);
735     }
736
737     @Override
738     public String getHelp() {
739         StringBuffer help = new StringBuffer();
740         help.append("---Topology Manager---\n");
741         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
742         help.append("\t deleteUserLink <name>\n");
743         help.append("\t printUserLink\n");
744         help.append("\t printNodeEdges\n");
745         return help.toString();
746     }
747
748     public void _printUserLink(CommandInterpreter ci) {
749         for (String name : this.userLinksDB.keySet()) {
750             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
751             ci.println("Name : " + name);
752             ci.println(linkConfig);
753             ci.println("Edge " + getLinkTuple(linkConfig));
754             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
755         }
756     }
757
758     public void _addUserLink(CommandInterpreter ci) {
759         String name = ci.nextArgument();
760         if ((name == null)) {
761             ci.println("Please enter a valid Name");
762             return;
763         }
764
765         String ncStr1 = ci.nextArgument();
766         if (ncStr1 == null) {
767             ci.println("Please enter two node connector strings");
768             return;
769         }
770         String ncStr2 = ci.nextArgument();
771         if (ncStr2 == null) {
772             ci.println("Please enter second node connector string");
773             return;
774         }
775
776         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
777         if (nc1 == null) {
778             ci.println("Invalid input node connector 1 string: " + ncStr1);
779             return;
780         }
781         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
782         if (nc2 == null) {
783             ci.println("Invalid input node connector 2 string: " + ncStr2);
784             return;
785         }
786
787         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
788         ci.println(this.addUserLink(config));
789     }
790
791     public void _deleteUserLink(CommandInterpreter ci) {
792         String name = ci.nextArgument();
793         if ((name == null)) {
794             ci.println("Please enter a valid Name");
795             return;
796         }
797         this.deleteUserLink(name);
798     }
799
800     public void _printNodeEdges(CommandInterpreter ci) {
801         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
802         if (nodeEdges == null) {
803             return;
804         }
805         Set<Node> nodeSet = nodeEdges.keySet();
806         if (nodeSet == null) {
807             return;
808         }
809         ci.println("        Node                                         Edge");
810         for (Node node : nodeSet) {
811             Set<Edge> edgeSet = nodeEdges.get(node);
812             if (edgeSet == null) {
813                 continue;
814             }
815             for (Edge edge : edgeSet) {
816                 ci.println(node + "             " + edge);
817             }
818         }
819     }
820
821     @Override
822     public Object readObject(ObjectInputStream ois)
823             throws FileNotFoundException, IOException, ClassNotFoundException {
824         return ois.readObject();
825     }
826
827     @Override
828     public Status saveConfiguration() {
829         return saveConfig();
830     }
831
832     @Override
833     public void edgeOverUtilized(Edge edge) {
834         log.warn("Link Utilization above normal: {}", edge);
835     }
836
837     @Override
838     public void edgeUtilBackToNormal(Edge edge) {
839         log.warn("Link Utilization back to normal: {}", edge);
840     }
841
842     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
843         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
844         upd.setLocal(isLocal);
845         notifyQ.add(upd);
846     }
847
848     @Override
849     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
850         if (cacheName.equals(TOPOEDGESDB)) {
851             // This is the case of an Edge being added to the topology DB
852             final Edge e = (Edge) key;
853             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
854             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
855         }
856     }
857
858     @Override
859     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
860         if (cacheName.equals(TOPOEDGESDB)) {
861             final Edge e = (Edge) key;
862             log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
863             final Set<Property> props = (Set<Property>) new_value;
864             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
865         }
866     }
867
868     @Override
869     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
870         if (cacheName.equals(TOPOEDGESDB)) {
871             final Edge e = (Edge) key;
872             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
873             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
874         }
875     }
876
877     class TopologyNotify implements Runnable {
878         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
879         private TopoEdgeUpdate entry;
880         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
881         private boolean notifyListeners;
882
883         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
884             this.notifyQ = notifyQ;
885         }
886
887         @Override
888         public void run() {
889             while (true) {
890                 try {
891                     log.trace("New run of TopologyNotify");
892                     notifyListeners = false;
893                     // First we block waiting for an element to get in
894                     entry = notifyQ.take();
895                     // Then we drain the whole queue if elements are
896                     // in it without getting into any blocking condition
897                     for (; entry != null; entry = notifyQ.poll()) {
898                         teuList.add(entry);
899                         notifyListeners = true;
900                     }
901
902                     // Notify listeners only if there were updates drained else
903                     // give up
904                     if (notifyListeners) {
905                         log.trace("Notifier thread, notified a listener");
906                         // Now update the listeners
907                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
908                             try {
909                                 s.edgeUpdate(teuList);
910                             } catch (Exception exc) {
911                                 log.error("Exception on edge update:", exc);
912                             }
913                         }
914                     }
915                     teuList.clear();
916
917                     // Lets sleep for sometime to allow aggregation of event
918                     Thread.sleep(100);
919                 } catch (InterruptedException e1) {
920                     if (shuttingDown) {
921                         return;
922                     }
923                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
924                 } catch (Exception e2) {
925                     log.error("", e2);
926                 }
927             }
928         }
929     }
930 }