Merge "Added validation for protocol field while adding flow"
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.topologymanager.ITopologyManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
59 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * The class describes TopologyManager which is the central repository of the
67  * network topology. It provides service for applications to interact with
68  * topology database and notifies all the listeners of topology changes.
69  */
70 public class TopologyManagerImpl implements
71         ICacheUpdateAware,
72         ITopologyManager,
73         IConfigurationContainerAware,
74         IListenTopoUpdates,
75         IObjectReader,
76         CommandProvider {
77     static final String TOPOEDGESDB = "topologymanager.edgesDB";
78     static final String TOPOHOSTSDB = "topologymanager.hostsDB";
79     static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
80     static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
81     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
82     private static final String SAVE = "Save";
83     private ITopologyService topoService;
84     private IClusterContainerServices clusterContainerService;
85     // DB of all the Edges with properties which constitute our topology
86     private ConcurrentMap<Edge, Set<Property>> edgesDB;
87     // DB of all NodeConnector which are part of ISL Edges, meaning they
88     // are connected to another NodeConnector on the other side of an ISL link.
89     // NodeConnector of a Production Edge is not part of this DB.
90     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
91     // DB of all the NodeConnectors with an Host attached to it
92     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
93     // Topology Manager Aware listeners
94     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
95     // Topology Manager Aware listeners - for clusterwide updates
96     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
97             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
98     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
99     private String userLinksFileName;
100     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
101     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
102     private volatile Boolean shuttingDown = false;
103     private Thread notifyThread;
104
105
106     void nonClusterObjectCreate() {
107         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
108         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
109         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
110         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
111     }
112
113     void setTopologyManagerAware(ITopologyManagerAware s) {
114         if (this.topologyManagerAware != null) {
115             log.debug("Adding ITopologyManagerAware: {}", s);
116             this.topologyManagerAware.add(s);
117         }
118     }
119
120     void unsetTopologyManagerAware(ITopologyManagerAware s) {
121         if (this.topologyManagerAware != null) {
122             log.debug("Removing ITopologyManagerAware: {}", s);
123             this.topologyManagerAware.remove(s);
124         }
125     }
126
127     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
128         if (this.topologyManagerClusterWideAware != null) {
129             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
130             this.topologyManagerClusterWideAware.add(s);
131         }
132     }
133
134     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
135         if (this.topologyManagerClusterWideAware != null) {
136             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
137             this.topologyManagerClusterWideAware.remove(s);
138         }
139     }
140
141     void setTopoService(ITopologyService s) {
142         log.debug("Adding ITopologyService: {}", s);
143         this.topoService = s;
144     }
145
146     void unsetTopoService(ITopologyService s) {
147         if (this.topoService == s) {
148             log.debug("Removing ITopologyService: {}", s);
149             this.topoService = null;
150         }
151     }
152
153     void setClusterContainerService(IClusterContainerServices s) {
154         log.debug("Cluster Service set");
155         this.clusterContainerService = s;
156     }
157
158     void unsetClusterContainerService(IClusterContainerServices s) {
159         if (this.clusterContainerService == s) {
160             log.debug("Cluster Service removed!");
161             this.clusterContainerService = null;
162         }
163     }
164
165     /**
166      * Function called by the dependency manager when all the required
167      * dependencies are satisfied
168      *
169      */
170     void init(Component c) {
171         allocateCaches();
172         retrieveCaches();
173         String containerName = null;
174         Dictionary<?, ?> props = c.getServiceProperties();
175         if (props != null) {
176             containerName = (String) props.get("containerName");
177         } else {
178             // In the Global instance case the containerName is empty
179             containerName = "UNKNOWN";
180         }
181
182         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
183         registerWithOSGIConsole();
184         loadConfiguration();
185         // Restore the shuttingDown status on init of the component
186         shuttingDown = false;
187         notifyThread = new Thread(new TopologyNotify(notifyQ));
188     }
189
190     @SuppressWarnings({ "unchecked", "deprecation" })
191     private void allocateCaches() {
192         try {
193             this.edgesDB =
194                     (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
195                             EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
196         } catch (CacheExistException cee) {
197             log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
198         } catch (CacheConfigException cce) {
199             log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
200         }
201
202         try {
203             this.hostsDB =
204                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
205                             TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
206         } catch (CacheExistException cee) {
207             log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
208         } catch (CacheConfigException cce) {
209             log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
210         }
211
212         try {
213             this.nodeConnectorsDB =
214                     (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
215                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
216         } catch (CacheExistException cee) {
217             log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
218         } catch (CacheConfigException cce) {
219             log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
220         }
221
222         try {
223             this.userLinksDB =
224                     (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
225                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
226         } catch (CacheExistException cee) {
227             log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
228         } catch (CacheConfigException cce) {
229             log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
230         }
231     }
232
233     @SuppressWarnings({ "unchecked", "deprecation" })
234     private void retrieveCaches() {
235         if (this.clusterContainerService == null) {
236             log.error("Cluster Services is null, can't retrieve caches.");
237             return;
238         }
239
240         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
241         if (edgesDB == null) {
242             log.error("Failed to get cache for " + TOPOEDGESDB);
243         }
244
245         this.hostsDB =
246                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
247         if (hostsDB == null) {
248             log.error("Failed to get cache for " + TOPOHOSTSDB);
249         }
250
251         this.nodeConnectorsDB =
252                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
253         if (nodeConnectorsDB == null) {
254             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
255         }
256
257         this.userLinksDB =
258                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
259         if (userLinksDB == null) {
260             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
261         }
262     }
263
264     /**
265      * Function called after the topology manager has registered the service in
266      * OSGi service registry.
267      *
268      */
269     void started() {
270         // Start the batcher thread for the cluster wide topology updates
271         notifyThread.start();
272         // SollicitRefresh MUST be called here else if called at init
273         // time it may sollicit refresh too soon.
274         log.debug("Sollicit topology refresh");
275         topoService.sollicitRefresh();
276     }
277
278     void stop() {
279         shuttingDown = true;
280         notifyThread.interrupt();
281     }
282
283     /**
284      * Function called by the dependency manager when at least one dependency
285      * become unsatisfied or when the component is shutting down because for
286      * example bundle is being stopped.
287      *
288      */
289     void destroy() {
290         notifyQ.clear();
291         notifyThread = null;
292     }
293
294     @SuppressWarnings("unchecked")
295     private void loadConfiguration() {
296         ObjectReader objReader = new ObjectReader();
297         ConcurrentMap<String, TopologyUserLinkConfig> confList =
298                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
299
300         if (confList != null) {
301             for (TopologyUserLinkConfig conf : confList.values()) {
302                 addUserLink(conf);
303             }
304         }
305     }
306
307     @Override
308     public Status saveConfig() {
309         return saveConfigInternal();
310     }
311
312     public Status saveConfigInternal() {
313         ObjectWriter objWriter = new ObjectWriter();
314
315         Status saveStatus = objWriter.write(
316                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
317
318         if (! saveStatus.isSuccess()) {
319             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
320         }
321         return saveStatus;
322     }
323
324     @Override
325     public Map<Node, Set<Edge>> getNodeEdges() {
326         if (this.edgesDB == null) {
327             return null;
328         }
329
330         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
331         for (Edge edge : this.edgesDB.keySet()) {
332             // Lets analyze the tail
333             Node node = edge.getTailNodeConnector().getNode();
334             Set<Edge> nodeEdges = res.get(node);
335             if (nodeEdges == null) {
336                 nodeEdges = new HashSet<Edge>();
337                 res.put(node, nodeEdges);
338             }
339             nodeEdges.add(edge);
340
341             // Lets analyze the head
342             node = edge.getHeadNodeConnector().getNode();
343             nodeEdges = res.get(node);
344             if (nodeEdges == null) {
345                 nodeEdges = new HashSet<Edge>();
346                 res.put(node, nodeEdges);
347             }
348             nodeEdges.add(edge);
349         }
350
351         return res;
352     }
353
354     @Override
355     public boolean isInternal(NodeConnector p) {
356         if (this.nodeConnectorsDB == null) {
357             return false;
358         }
359
360         // This is an internal NodeConnector if is connected to
361         // another Node i.e it's part of the nodeConnectorsDB
362         return (this.nodeConnectorsDB.get(p) != null);
363     }
364
365     /**
366      * This method returns true if the edge is an ISL link.
367      *
368      * @param e
369      *            The edge
370      * @return true if it is an ISL link
371      */
372     public boolean isISLink(Edge e) {
373         return (!isProductionLink(e));
374     }
375
376     /**
377      * This method returns true if the edge is a production link.
378      *
379      * @param e
380      *            The edge
381      * @return true if it is a production link
382      */
383     public boolean isProductionLink(Edge e) {
384         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
385                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
386     }
387
388     /**
389      * The Map returned is a copy of the current topology hence if the topology
390      * changes the copy doesn't
391      *
392      * @return A Map representing the current topology expressed as edges of the
393      *         network
394      */
395     @Override
396     public Map<Edge, Set<Property>> getEdges() {
397         if (this.edgesDB == null) {
398             return null;
399         }
400
401         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
402         Set<Property> props;
403         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
404             // Sets of props are copied because the composition of
405             // those properties could change with time
406             props = new HashSet<Property>(edgeEntry.getValue());
407             // We can simply reuse the key because the object is
408             // immutable so doesn't really matter that we are
409             // referencing the only owned by a different table, the
410             // meaning is the same because doesn't change with time.
411             edgeMap.put(edgeEntry.getKey(), props);
412         }
413
414         return edgeMap;
415     }
416
417     @Override
418     public Set<NodeConnector> getNodeConnectorWithHost() {
419         if (this.hostsDB == null) {
420             return null;
421         }
422
423         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
424     }
425
426     @Override
427     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
428         if (this.hostsDB == null) {
429             return null;
430         }
431         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
432         Node node;
433         Set<NodeConnector> portSet;
434         for (NodeConnector nc : this.hostsDB.keySet()) {
435             node = nc.getNode();
436             portSet = res.get(node);
437             if (portSet == null) {
438                 // Create the HashSet if null
439                 portSet = new HashSet<NodeConnector>();
440                 res.put(node, portSet);
441             }
442
443             // Keep updating the HashSet, given this is not a
444             // clustered map we can just update the set without
445             // worrying to update the hashmap.
446             portSet.add(nc);
447         }
448
449         return (res);
450     }
451
452     @Override
453     public Host getHostAttachedToNodeConnector(NodeConnector port) {
454         List<Host> hosts = getHostsAttachedToNodeConnector(port);
455         if(hosts != null && !hosts.isEmpty()){
456             return hosts.get(0);
457         }
458         return null;
459     }
460
461     @Override
462     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
463         Set<ImmutablePair<Host, Set<Property>>> hosts;
464         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
465             return null;
466         }
467         // create a list of hosts
468         List<Host> retHosts = new LinkedList<Host>();
469         for(ImmutablePair<Host, Set<Property>> host : hosts) {
470             retHosts.add(host.getLeft());
471         }
472         return retHosts;
473     }
474
475     @Override
476     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
477
478         // Clone the property set in case non null else just
479         // create an empty one. Caches allocated via infinispan
480         // don't allow null values
481         if (props == null) {
482             props = new HashSet<Property>();
483         } else {
484             props = new HashSet<Property>(props);
485         }
486         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
487
488         // get the host list
489         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
490         if(hostSet == null) {
491             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
492         }
493         switch (t) {
494         case ADDED:
495         case CHANGED:
496             hostSet.add(thisHost);
497             this.hostsDB.put(port, hostSet);
498             break;
499         case REMOVED:
500             hostSet.remove(thisHost);
501             if(hostSet.isEmpty()) {
502                 //remove only if hasn't been concurrently modified
503                 this.hostsDB.remove(port, hostSet);
504             } else {
505                 this.hostsDB.put(port, hostSet);
506             }
507             break;
508         }
509     }
510
511     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
512         switch (type) {
513         case ADDED:
514             // Make sure the props are non-null
515             if (props == null) {
516                 props = new HashSet<Property>();
517             } else {
518                 props = new HashSet<Property>(props);
519             }
520
521             //in case of node switch-over to a different cluster controller,
522             //let's retain edge props
523             Set<Property> currentProps = this.edgesDB.get(e);
524             if (currentProps != null){
525                 props.addAll(currentProps);
526             }
527
528             // Now make sure there is the creation timestamp for the
529             // edge, if not there, stamp with the first update
530             boolean found_create = false;
531             for (Property prop : props) {
532                 if (prop instanceof TimeStamp) {
533                     TimeStamp t = (TimeStamp) prop;
534                     if (t.getTimeStampName().equals("creation")) {
535                         found_create = true;
536                         break;
537                     }
538                 }
539             }
540
541             if (!found_create) {
542                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
543                 props.add(t);
544             }
545
546             // Now add this in the database eventually overriding
547             // something that may have been already existing
548             this.edgesDB.put(e, props);
549
550             // Now populate the DB of NodeConnectors
551             // NOTE WELL: properties are empty sets, not really needed
552             // for now.
553             // The DB only contains ISL ports
554             if (isISLink(e)) {
555                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
556                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
557             }
558             log.trace("Edge {}  {}", e.toString(), type.name());
559             break;
560         case REMOVED:
561             // Now remove the edge from edgesDB
562             this.edgesDB.remove(e);
563
564             // Now lets update the NodeConnectors DB, the assumption
565             // here is that two NodeConnector are exclusively
566             // connected by 1 and only 1 edge, this is reasonable in
567             // the same plug (virtual of phisical) we can assume two
568             // cables won't be plugged. This could break only in case
569             // of devices in the middle that acts as hubs, but it
570             // should be safe to assume that won't happen.
571             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
572             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
573             log.trace("Edge {}  {}", e.toString(), type.name());
574             break;
575         case CHANGED:
576             Set<Property> oldProps = this.edgesDB.get(e);
577
578             // When property changes lets make sure we can change it
579             // all except the creation time stamp because that should
580             // be changed only when the edge is destroyed and created
581             // again
582             TimeStamp timeStamp = null;
583             for (Property prop : oldProps) {
584                 if (prop instanceof TimeStamp) {
585                     TimeStamp tsProp = (TimeStamp) prop;
586                     if (tsProp.getTimeStampName().equals("creation")) {
587                         timeStamp = tsProp;
588                         break;
589                     }
590                 }
591             }
592
593             // Now lets make sure new properties are non-null
594             if (props == null) {
595                 props = new HashSet<Property>();
596             } else {
597                 // Copy the set so noone is going to change the content
598                 props = new HashSet<Property>(props);
599             }
600
601             // Now lets remove the creation property if exist in the
602             // new props
603             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
604                 Property prop = i.next();
605                 if (prop instanceof TimeStamp) {
606                     TimeStamp t = (TimeStamp) prop;
607                     if (t.getTimeStampName().equals("creation")) {
608                         i.remove();
609                         break;
610                     }
611                 }
612             }
613
614             // Now lets add the creation timestamp in it
615             if (timeStamp != null) {
616                 props.add(timeStamp);
617             }
618
619             // Finally update
620             this.edgesDB.put(e, props);
621             log.trace("Edge {}  {}", e.toString(), type.name());
622             break;
623         }
624         return new TopoEdgeUpdate(e, props, type);
625     }
626
627     @Override
628     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
629         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
630         for (int i = 0; i < topoedgeupdateList.size(); i++) {
631             Edge e = topoedgeupdateList.get(i).getEdge();
632             Set<Property> p = topoedgeupdateList.get(i).getProperty();
633             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
634             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
635             teuList.add(teu);
636         }
637
638         // Now update the listeners
639         for (ITopologyManagerAware s : this.topologyManagerAware) {
640             try {
641                 s.edgeUpdate(teuList);
642             } catch (Exception exc) {
643                 log.error("Exception on edge update:", exc);
644             }
645         }
646
647     }
648
649     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
650         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
651                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
652         return getLinkTuple(rLink);
653     }
654
655
656     private Edge getLinkTuple(TopologyUserLinkConfig link) {
657         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
658         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
659         try {
660             return new Edge(srcNodeConnector, dstNodeConnector);
661         } catch (Exception e) {
662             return null;
663         }
664     }
665
666     @Override
667     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
668         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
669     }
670
671     @Override
672     public Status addUserLink(TopologyUserLinkConfig userLink) {
673         if (!userLink.isValid()) {
674             return new Status(StatusCode.BADREQUEST,
675                     "User link configuration invalid.");
676         }
677         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
678
679         //Check if this link already configured
680         //NOTE: infinispan cache doesn't support Map.containsValue()
681         // (which is linear time in most ConcurrentMap impl anyway)
682         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
683             if (existingLink.equals(userLink)) {
684                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
685             }
686         }
687         //attempt put, if mapping for this key already existed return conflict
688         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
689             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
690                     + " already exists. Please use another name");
691         }
692
693         Edge linkTuple = getLinkTuple(userLink);
694         if (linkTuple != null) {
695             if (!isProductionLink(linkTuple)) {
696                 edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
697             }
698
699             linkTuple = getReverseLinkTuple(userLink);
700             if (linkTuple != null) {
701                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
702                 if (!isProductionLink(linkTuple)) {
703                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
704                 }
705             }
706         }
707         return new Status(StatusCode.SUCCESS);
708     }
709
710     @Override
711     public Status deleteUserLink(String linkName) {
712         if (linkName == null) {
713             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
714         }
715
716         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
717         Edge linkTuple;
718         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
719             if (! isProductionLink(linkTuple)) {
720                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
721             }
722
723             linkTuple = getReverseLinkTuple(link);
724             if (! isProductionLink(linkTuple)) {
725                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
726             }
727         }
728         return new Status(StatusCode.SUCCESS);
729     }
730
731     private void registerWithOSGIConsole() {
732         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
733                 .getBundleContext();
734         bundleContext.registerService(CommandProvider.class.getName(), this,
735                 null);
736     }
737
738     @Override
739     public String getHelp() {
740         StringBuffer help = new StringBuffer();
741         help.append("---Topology Manager---\n");
742         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
743         help.append("\t deleteUserLink <name>\n");
744         help.append("\t printUserLink\n");
745         help.append("\t printNodeEdges\n");
746         return help.toString();
747     }
748
749     public void _printUserLink(CommandInterpreter ci) {
750         for (String name : this.userLinksDB.keySet()) {
751             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
752             ci.println("Name : " + name);
753             ci.println(linkConfig);
754             ci.println("Edge " + getLinkTuple(linkConfig));
755             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
756         }
757     }
758
759     public void _addUserLink(CommandInterpreter ci) {
760         String name = ci.nextArgument();
761         if ((name == null)) {
762             ci.println("Please enter a valid Name");
763             return;
764         }
765
766         String ncStr1 = ci.nextArgument();
767         if (ncStr1 == null) {
768             ci.println("Please enter two node connector strings");
769             return;
770         }
771         String ncStr2 = ci.nextArgument();
772         if (ncStr2 == null) {
773             ci.println("Please enter second node connector string");
774             return;
775         }
776
777         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
778         if (nc1 == null) {
779             ci.println("Invalid input node connector 1 string: " + ncStr1);
780             return;
781         }
782         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
783         if (nc2 == null) {
784             ci.println("Invalid input node connector 2 string: " + ncStr2);
785             return;
786         }
787
788         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
789         ci.println(this.addUserLink(config));
790     }
791
792     public void _deleteUserLink(CommandInterpreter ci) {
793         String name = ci.nextArgument();
794         if ((name == null)) {
795             ci.println("Please enter a valid Name");
796             return;
797         }
798         this.deleteUserLink(name);
799     }
800
801     public void _printNodeEdges(CommandInterpreter ci) {
802         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
803         if (nodeEdges == null) {
804             return;
805         }
806         Set<Node> nodeSet = nodeEdges.keySet();
807         if (nodeSet == null) {
808             return;
809         }
810         ci.println("        Node                                         Edge");
811         for (Node node : nodeSet) {
812             Set<Edge> edgeSet = nodeEdges.get(node);
813             if (edgeSet == null) {
814                 continue;
815             }
816             for (Edge edge : edgeSet) {
817                 ci.println(node + "             " + edge);
818             }
819         }
820     }
821
822     @Override
823     public Object readObject(ObjectInputStream ois)
824             throws FileNotFoundException, IOException, ClassNotFoundException {
825         return ois.readObject();
826     }
827
828     @Override
829     public Status saveConfiguration() {
830         return saveConfig();
831     }
832
833     @Override
834     public void edgeOverUtilized(Edge edge) {
835         log.warn("Link Utilization above normal: {}", edge);
836     }
837
838     @Override
839     public void edgeUtilBackToNormal(Edge edge) {
840         log.warn("Link Utilization back to normal: {}", edge);
841     }
842
843     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
844         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
845         upd.setLocal(isLocal);
846         notifyQ.add(upd);
847     }
848
849     @Override
850     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
851         if (cacheName.equals(TOPOEDGESDB)) {
852             // This is the case of an Edge being added to the topology DB
853             final Edge e = (Edge) key;
854             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
855             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
856         }
857     }
858
859     @Override
860     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
861         if (cacheName.equals(TOPOEDGESDB)) {
862             final Edge e = (Edge) key;
863             log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
864             final Set<Property> props = (Set<Property>) new_value;
865             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
866         }
867     }
868
869     @Override
870     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
871         if (cacheName.equals(TOPOEDGESDB)) {
872             final Edge e = (Edge) key;
873             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
874             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
875         }
876     }
877
878     class TopologyNotify implements Runnable {
879         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
880         private TopoEdgeUpdate entry;
881         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
882         private boolean notifyListeners;
883
884         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
885             this.notifyQ = notifyQ;
886         }
887
888         @Override
889         public void run() {
890             while (true) {
891                 try {
892                     log.trace("New run of TopologyNotify");
893                     notifyListeners = false;
894                     // First we block waiting for an element to get in
895                     entry = notifyQ.take();
896                     // Then we drain the whole queue if elements are
897                     // in it without getting into any blocking condition
898                     for (; entry != null; entry = notifyQ.poll()) {
899                         teuList.add(entry);
900                         notifyListeners = true;
901                     }
902
903                     // Notify listeners only if there were updates drained else
904                     // give up
905                     if (notifyListeners) {
906                         log.trace("Notifier thread, notified a listener");
907                         // Now update the listeners
908                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
909                             try {
910                                 s.edgeUpdate(teuList);
911                             } catch (Exception exc) {
912                                 log.error("Exception on edge update:", exc);
913                             }
914                         }
915                     }
916                     teuList.clear();
917
918                     // Lets sleep for sometime to allow aggregation of event
919                     Thread.sleep(100);
920                 } catch (InterruptedException e1) {
921                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
922                     if (shuttingDown) {
923                         return;
924                     }
925                 } catch (Exception e2) {
926                     log.error("", e2);
927                 }
928             }
929         }
930     }
931 }