Topology Manager changes for cross checking Production switches
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
53 import org.opendaylight.controller.sal.utils.ObjectReader;
54 import org.opendaylight.controller.sal.utils.ObjectWriter;
55 import org.opendaylight.controller.sal.utils.Status;
56 import org.opendaylight.controller.sal.utils.StatusCode;
57 import org.opendaylight.controller.switchmanager.ISwitchManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManager;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
60 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
61 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
62 import org.osgi.framework.BundleContext;
63 import org.osgi.framework.FrameworkUtil;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  * The class describes TopologyManager which is the central repository of the
69  * network topology. It provides service for applications to interact with
70  * topology database and notifies all the listeners of topology changes.
71  */
72 public class TopologyManagerImpl implements
73         ICacheUpdateAware<Object, Object>,
74         ITopologyManager,
75         IConfigurationContainerAware,
76         IListenTopoUpdates,
77         IObjectReader,
78         CommandProvider {
79     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
80     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
81     protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
82     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
83     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
84     private ITopologyService topoService;
85     private IClusterContainerServices clusterContainerService;
86     private ISwitchManager switchManager;
87     // DB of all the Edges with properties which constitute our topology
88     private ConcurrentMap<Edge, Set<Property>> edgesDB;
89     // DB of all NodeConnector which are part of ISL Edges, meaning they
90     // are connected to another NodeConnector on the other side of an ISL link.
91     // NodeConnector of a Production Edge is not part of this DB.
92     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
93     // DB of all the NodeConnectors with an Host attached to it
94     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
95     // Topology Manager Aware listeners
96     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
97     // Topology Manager Aware listeners - for clusterwide updates
98     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
99             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
100     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
101     private String userLinksFileName;
102     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
103     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
104     private volatile Boolean shuttingDown = false;
105     private Thread notifyThread;
106
107
108     void nonClusterObjectCreate() {
109         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
110         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
111         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
112         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
113     }
114
115     void setTopologyManagerAware(ITopologyManagerAware s) {
116         if (this.topologyManagerAware != null) {
117             log.debug("Adding ITopologyManagerAware: {}", s);
118             this.topologyManagerAware.add(s);
119         }
120     }
121
122     void unsetTopologyManagerAware(ITopologyManagerAware s) {
123         if (this.topologyManagerAware != null) {
124             log.debug("Removing ITopologyManagerAware: {}", s);
125             this.topologyManagerAware.remove(s);
126         }
127     }
128
129     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
130         if (this.topologyManagerClusterWideAware != null) {
131             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
132             this.topologyManagerClusterWideAware.add(s);
133         }
134     }
135
136     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
137         if (this.topologyManagerClusterWideAware != null) {
138             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
139             this.topologyManagerClusterWideAware.remove(s);
140         }
141     }
142
143     void setTopoService(ITopologyService s) {
144         log.debug("Adding ITopologyService: {}", s);
145         this.topoService = s;
146     }
147
148     void unsetTopoService(ITopologyService s) {
149         if (this.topoService == s) {
150             log.debug("Removing ITopologyService: {}", s);
151             this.topoService = null;
152         }
153     }
154
155     void setClusterContainerService(IClusterContainerServices s) {
156         log.debug("Cluster Service set");
157         this.clusterContainerService = s;
158     }
159
160     void unsetClusterContainerService(IClusterContainerServices s) {
161         if (this.clusterContainerService == s) {
162             log.debug("Cluster Service removed!");
163             this.clusterContainerService = null;
164         }
165     }
166
167     void setSwitchManager(ISwitchManager s) {
168         log.debug("Adding ISwitchManager: {}", s);
169         this.switchManager = s;
170     }
171
172     void unsetSwitchManager(ISwitchManager s) {
173         if (this.switchManager == s) {
174             log.debug("Removing ISwitchManager: {}", s);
175             this.switchManager = null;
176         }
177     }
178
179     /**
180      * Function called by the dependency manager when all the required
181      * dependencies are satisfied
182      *
183      */
184     void init(Component c) {
185         allocateCaches();
186         retrieveCaches();
187         String containerName = null;
188         Dictionary<?, ?> props = c.getServiceProperties();
189         if (props != null) {
190             containerName = (String) props.get("containerName");
191         } else {
192             // In the Global instance case the containerName is empty
193             containerName = "UNKNOWN";
194         }
195
196         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
197         registerWithOSGIConsole();
198         loadConfiguration();
199         // Restore the shuttingDown status on init of the component
200         shuttingDown = false;
201         notifyThread = new Thread(new TopologyNotify(notifyQ));
202     }
203
204     @SuppressWarnings({ "unchecked" })
205     private void allocateCaches() {
206             this.edgesDB =
207                     (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
208
209             this.hostsDB =
210                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
211
212             this.nodeConnectorsDB =
213                     (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
214                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
215             this.userLinksDB =
216                     (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
217                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
218     }
219
220     private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
221         ConcurrentMap<?, ?> cache = null;
222         try {
223             cache = this.clusterContainerService.createCache(cacheName, cacheModes);
224         } catch (CacheExistException e) {
225             log.debug(cacheName + " cache already exists - destroy and recreate if needed");
226         } catch (CacheConfigException e) {
227             log.error(cacheName + " cache configuration invalid - check cache mode");
228         }
229         return cache;
230     }
231
232     @SuppressWarnings({ "unchecked" })
233     private void retrieveCaches() {
234         if (this.clusterContainerService == null) {
235             log.error("Cluster Services is null, can't retrieve caches.");
236             return;
237         }
238
239         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
240         if (edgesDB == null) {
241             log.error("Failed to get cache for " + TOPOEDGESDB);
242         }
243
244         this.hostsDB =
245                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
246         if (hostsDB == null) {
247             log.error("Failed to get cache for " + TOPOHOSTSDB);
248         }
249
250         this.nodeConnectorsDB =
251                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
252         if (nodeConnectorsDB == null) {
253             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
254         }
255
256         this.userLinksDB =
257                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
258         if (userLinksDB == null) {
259             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
260         }
261     }
262
263     /**
264      * Function called after the topology manager has registered the service in
265      * OSGi service registry.
266      *
267      */
268     void started() {
269         // Start the batcher thread for the cluster wide topology updates
270         notifyThread.start();
271         // SollicitRefresh MUST be called here else if called at init
272         // time it may sollicit refresh too soon.
273         log.debug("Sollicit topology refresh");
274         topoService.sollicitRefresh();
275     }
276
277     void stop() {
278         shuttingDown = true;
279         notifyThread.interrupt();
280     }
281
282     /**
283      * Function called by the dependency manager when at least one dependency
284      * become unsatisfied or when the component is shutting down because for
285      * example bundle is being stopped.
286      *
287      */
288     void destroy() {
289         notifyQ.clear();
290         notifyThread = null;
291     }
292
293     @SuppressWarnings("unchecked")
294     private void loadConfiguration() {
295         ObjectReader objReader = new ObjectReader();
296         ConcurrentMap<String, TopologyUserLinkConfig> confList =
297                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
298
299         if (confList != null) {
300             for (TopologyUserLinkConfig conf : confList.values()) {
301                 addUserLink(conf);
302             }
303         }
304     }
305
306     @Override
307     public Status saveConfig() {
308         return saveConfigInternal();
309     }
310
311     public Status saveConfigInternal() {
312         ObjectWriter objWriter = new ObjectWriter();
313
314         Status saveStatus = objWriter.write(
315                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
316
317         if (! saveStatus.isSuccess()) {
318             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
319         }
320         return saveStatus;
321     }
322
323     @Override
324     public Map<Node, Set<Edge>> getNodeEdges() {
325         if (this.edgesDB == null) {
326             return null;
327         }
328
329         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
330         for (Edge edge : this.edgesDB.keySet()) {
331             // Lets analyze the tail
332             Node node = edge.getTailNodeConnector().getNode();
333             Set<Edge> nodeEdges = res.get(node);
334             if (nodeEdges == null) {
335                 nodeEdges = new HashSet<Edge>();
336                 res.put(node, nodeEdges);
337             }
338             nodeEdges.add(edge);
339
340             // Lets analyze the head
341             node = edge.getHeadNodeConnector().getNode();
342             nodeEdges = res.get(node);
343             if (nodeEdges == null) {
344                 nodeEdges = new HashSet<Edge>();
345                 res.put(node, nodeEdges);
346             }
347             nodeEdges.add(edge);
348         }
349
350         return res;
351     }
352
353     @Override
354     public boolean isInternal(NodeConnector p) {
355         if (this.nodeConnectorsDB == null) {
356             return false;
357         }
358
359         // This is an internal NodeConnector if is connected to
360         // another Node i.e it's part of the nodeConnectorsDB
361         return (this.nodeConnectorsDB.get(p) != null);
362     }
363
364     /**
365      * This method returns true if the edge is an ISL link.
366      *
367      * @param e
368      *            The edge
369      * @return true if it is an ISL link
370      */
371     public boolean isISLink(Edge e) {
372         return (!isProductionLink(e));
373     }
374
375     /**
376      * This method returns true if the edge is a production link.
377      *
378      * @param e
379      *            The edge
380      * @return true if it is a production link
381      */
382     public boolean isProductionLink(Edge e) {
383         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
384                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
385     }
386
387     /**
388      * This method cross checks the determination of nodeConnector type by Discovery Service
389      * against the information in SwitchManager and updates it accordingly.
390      * @param e
391      *          The edge
392      */
393     private void crossCheckNodeConnectors(Edge e) {
394         NodeConnector nc;
395         if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
396             nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
397             if (nc != null) {
398                 e.setHeadNodeConnector(nc);
399             }
400         }
401         if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
402             nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
403             if (nc != null) {
404                 e.setTailNodeConnector(nc);
405             }
406         }
407     }
408
409     /**
410      * A NodeConnector may have been categorized as of type Production by Discovery Service.
411      * But at the time when this determination was made, only OF nodes were known to Discovery
412      * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
413      * so, then it returns a new NodeConnector with correct type.
414      *
415      * @param nc
416      *       NodeConnector as passed on in the edge
417      * @return
418      *       If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
419      *       with correct type, null otherwise
420      */
421
422     private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
423
424         for (Node node : switchManager.getNodes()) {
425             String nodeName = node.getNodeIDString();
426             log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
427                     nc.getNode().getNodeIDString());
428             if (nodeName.equals(nc.getNode().getNodeIDString())) {
429                 NodeConnector nodeConnector = NodeConnectorCreator
430                         .createNodeConnector(node.getType(), nc.getID(), node);
431                 return nodeConnector;
432             }
433         }
434         return null;
435     }
436
437     /**
438      * The Map returned is a copy of the current topology hence if the topology
439      * changes the copy doesn't
440      *
441      * @return A Map representing the current topology expressed as edges of the
442      *         network
443      */
444     @Override
445     public Map<Edge, Set<Property>> getEdges() {
446         if (this.edgesDB == null) {
447             return null;
448         }
449
450         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
451         Set<Property> props;
452         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
453             // Sets of props are copied because the composition of
454             // those properties could change with time
455             props = new HashSet<Property>(edgeEntry.getValue());
456             // We can simply reuse the key because the object is
457             // immutable so doesn't really matter that we are
458             // referencing the only owned by a different table, the
459             // meaning is the same because doesn't change with time.
460             edgeMap.put(edgeEntry.getKey(), props);
461         }
462
463         return edgeMap;
464     }
465
466     @Override
467     public Set<NodeConnector> getNodeConnectorWithHost() {
468         if (this.hostsDB == null) {
469             return null;
470         }
471
472         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
473     }
474
475     @Override
476     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
477         if (this.hostsDB == null) {
478             return null;
479         }
480         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
481         Node node;
482         Set<NodeConnector> portSet;
483         for (NodeConnector nc : this.hostsDB.keySet()) {
484             node = nc.getNode();
485             portSet = res.get(node);
486             if (portSet == null) {
487                 // Create the HashSet if null
488                 portSet = new HashSet<NodeConnector>();
489                 res.put(node, portSet);
490             }
491
492             // Keep updating the HashSet, given this is not a
493             // clustered map we can just update the set without
494             // worrying to update the hashmap.
495             portSet.add(nc);
496         }
497
498         return (res);
499     }
500
501     @Override
502     public Host getHostAttachedToNodeConnector(NodeConnector port) {
503         List<Host> hosts = getHostsAttachedToNodeConnector(port);
504         if(hosts != null && !hosts.isEmpty()){
505             return hosts.get(0);
506         }
507         return null;
508     }
509
510     @Override
511     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
512         Set<ImmutablePair<Host, Set<Property>>> hosts;
513         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
514             return null;
515         }
516         // create a list of hosts
517         List<Host> retHosts = new LinkedList<Host>();
518         for(ImmutablePair<Host, Set<Property>> host : hosts) {
519             retHosts.add(host.getLeft());
520         }
521         return retHosts;
522     }
523
524     @Override
525     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
526
527         // Clone the property set in case non null else just
528         // create an empty one. Caches allocated via infinispan
529         // don't allow null values
530         if (props == null) {
531             props = new HashSet<Property>();
532         } else {
533             props = new HashSet<Property>(props);
534         }
535         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
536
537         // get the host list
538         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
539         if(hostSet == null) {
540             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
541         }
542         switch (t) {
543         case ADDED:
544         case CHANGED:
545             hostSet.add(thisHost);
546             this.hostsDB.put(port, hostSet);
547             break;
548         case REMOVED:
549             hostSet.remove(thisHost);
550             if(hostSet.isEmpty()) {
551                 //remove only if hasn't been concurrently modified
552                 this.hostsDB.remove(port, hostSet);
553             } else {
554                 this.hostsDB.put(port, hostSet);
555             }
556             break;
557         }
558     }
559
560     private boolean headNodeConnectorExist(Edge e) {
561         /*
562          * Only check the head end point which is supposed to be part of a
563          * network node we control (present in our inventory). If we checked the
564          * tail end point as well, we would not store the edges that connect to
565          * a non sdn enable port on a non sdn capable production switch. We want
566          * to be able to see these switches on the topology.
567          */
568         NodeConnector head = e.getHeadNodeConnector();
569         return (switchManager.doesNodeConnectorExist(head));
570     }
571
572     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
573         switch (type) {
574         case ADDED:
575             // Avoid redundant update as notifications trigger expensive tasks
576             if (edgesDB.containsKey(e)) {
577                 log.trace("Skipping redundant edge addition: {}", e);
578                 return null;
579             }
580
581             // Ensure that head node connector exists
582             if (!headNodeConnectorExist(e)) {
583                 log.warn("Ignore edge that contains invalid node connector: {}", e);
584                 return null;
585             }
586
587             // Check if nodeConnectors of the edge were correctly categorized
588             // by OF plugin
589             crossCheckNodeConnectors(e);
590
591             // Make sure the props are non-null
592             if (props == null) {
593                 props = new HashSet<Property>();
594             } else {
595                 props = new HashSet<Property>(props);
596             }
597
598             //in case of node switch-over to a different cluster controller,
599             //let's retain edge props
600             Set<Property> currentProps = this.edgesDB.get(e);
601             if (currentProps != null){
602                 props.addAll(currentProps);
603             }
604
605             // Now make sure there is the creation timestamp for the
606             // edge, if not there, stamp with the first update
607             boolean found_create = false;
608             for (Property prop : props) {
609                 if (prop instanceof TimeStamp) {
610                     TimeStamp t = (TimeStamp) prop;
611                     if (t.getTimeStampName().equals("creation")) {
612                         found_create = true;
613                         break;
614                     }
615                 }
616             }
617
618             if (!found_create) {
619                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
620                 props.add(t);
621             }
622
623             // Now add this in the database eventually overriding
624             // something that may have been already existing
625             this.edgesDB.put(e, props);
626
627             // Now populate the DB of NodeConnectors
628             // NOTE WELL: properties are empty sets, not really needed
629             // for now.
630             // The DB only contains ISL ports
631             if (isISLink(e)) {
632                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
633                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
634             }
635             log.trace("Edge {}  {}", e.toString(), type.name());
636             break;
637         case REMOVED:
638             // Now remove the edge from edgesDB
639             this.edgesDB.remove(e);
640
641             // Now lets update the NodeConnectors DB, the assumption
642             // here is that two NodeConnector are exclusively
643             // connected by 1 and only 1 edge, this is reasonable in
644             // the same plug (virtual of phisical) we can assume two
645             // cables won't be plugged. This could break only in case
646             // of devices in the middle that acts as hubs, but it
647             // should be safe to assume that won't happen.
648             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
649             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
650             log.trace("Edge {}  {}", e.toString(), type.name());
651             break;
652         case CHANGED:
653             Set<Property> oldProps = this.edgesDB.get(e);
654
655             // When property changes lets make sure we can change it
656             // all except the creation time stamp because that should
657             // be changed only when the edge is destroyed and created
658             // again
659             TimeStamp timeStamp = null;
660             for (Property prop : oldProps) {
661                 if (prop instanceof TimeStamp) {
662                     TimeStamp tsProp = (TimeStamp) prop;
663                     if (tsProp.getTimeStampName().equals("creation")) {
664                         timeStamp = tsProp;
665                         break;
666                     }
667                 }
668             }
669
670             // Now lets make sure new properties are non-null
671             if (props == null) {
672                 props = new HashSet<Property>();
673             } else {
674                 // Copy the set so noone is going to change the content
675                 props = new HashSet<Property>(props);
676             }
677
678             // Now lets remove the creation property if exist in the
679             // new props
680             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
681                 Property prop = i.next();
682                 if (prop instanceof TimeStamp) {
683                     TimeStamp t = (TimeStamp) prop;
684                     if (t.getTimeStampName().equals("creation")) {
685                         i.remove();
686                         break;
687                     }
688                 }
689             }
690
691             // Now lets add the creation timestamp in it
692             if (timeStamp != null) {
693                 props.add(timeStamp);
694             }
695
696             // Finally update
697             this.edgesDB.put(e, props);
698             log.trace("Edge {}  {}", e.toString(), type.name());
699             break;
700         }
701         return new TopoEdgeUpdate(e, props, type);
702     }
703
704     @Override
705     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
706         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
707         for (int i = 0; i < topoedgeupdateList.size(); i++) {
708             Edge e = topoedgeupdateList.get(i).getEdge();
709             Set<Property> p = topoedgeupdateList.get(i).getProperty();
710             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
711             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
712             if (teu != null) {
713                 teuList.add(teu);
714             }
715         }
716
717         if (!teuList.isEmpty()) {
718             // Now update the listeners
719             for (ITopologyManagerAware s : this.topologyManagerAware) {
720                 try {
721                     s.edgeUpdate(teuList);
722                 } catch (Exception exc) {
723                     log.error("Exception on edge update:", exc);
724                 }
725             }
726         }
727     }
728
729     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
730         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
731                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
732         return getLinkTuple(rLink);
733     }
734
735
736     private Edge getLinkTuple(TopologyUserLinkConfig link) {
737         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
738         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
739         try {
740             return new Edge(srcNodeConnector, dstNodeConnector);
741         } catch (Exception e) {
742             return null;
743         }
744     }
745
746     @Override
747     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
748         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
749     }
750
751     @Override
752     public Status addUserLink(TopologyUserLinkConfig userLink) {
753         if (!userLink.isValid()) {
754             return new Status(StatusCode.BADREQUEST,
755                     "User link configuration invalid.");
756         }
757         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
758
759         //Check if this link already configured
760         //NOTE: infinispan cache doesn't support Map.containsValue()
761         // (which is linear time in most ConcurrentMap impl anyway)
762         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
763             if (existingLink.equals(userLink)) {
764                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
765             }
766         }
767         //attempt put, if mapping for this key already existed return conflict
768         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
769             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
770                     + " already exists. Please use another name");
771         }
772
773         Edge linkTuple = getLinkTuple(userLink);
774         if (linkTuple != null) {
775             if (!isProductionLink(linkTuple)) {
776                 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
777                                                 new HashSet<Property>());
778                 if (teu == null) {
779                     userLinksDB.remove(userLink.getName());
780                     return new Status(StatusCode.NOTFOUND,
781                            "Link configuration contains invalid node connector: "
782                            + userLink);
783                 }
784             }
785
786             linkTuple = getReverseLinkTuple(userLink);
787             if (linkTuple != null) {
788                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
789                 if (!isProductionLink(linkTuple)) {
790                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
791                 }
792             }
793         }
794         return new Status(StatusCode.SUCCESS);
795     }
796
797     @Override
798     public Status deleteUserLink(String linkName) {
799         if (linkName == null) {
800             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
801         }
802
803         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
804         Edge linkTuple;
805         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
806             if (! isProductionLink(linkTuple)) {
807                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
808             }
809
810             linkTuple = getReverseLinkTuple(link);
811             if (! isProductionLink(linkTuple)) {
812                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
813             }
814         }
815         return new Status(StatusCode.SUCCESS);
816     }
817
818     private void registerWithOSGIConsole() {
819         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
820                 .getBundleContext();
821         bundleContext.registerService(CommandProvider.class.getName(), this,
822                 null);
823     }
824
825     @Override
826     public String getHelp() {
827         StringBuffer help = new StringBuffer();
828         help.append("---Topology Manager---\n");
829         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
830         help.append("\t deleteUserLink <name>\n");
831         help.append("\t printUserLink\n");
832         help.append("\t printNodeEdges\n");
833         return help.toString();
834     }
835
836     public void _printUserLink(CommandInterpreter ci) {
837         for (String name : this.userLinksDB.keySet()) {
838             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
839             ci.println("Name : " + name);
840             ci.println(linkConfig);
841             ci.println("Edge " + getLinkTuple(linkConfig));
842             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
843         }
844     }
845
846     public void _addUserLink(CommandInterpreter ci) {
847         String name = ci.nextArgument();
848         if ((name == null)) {
849             ci.println("Please enter a valid Name");
850             return;
851         }
852
853         String ncStr1 = ci.nextArgument();
854         if (ncStr1 == null) {
855             ci.println("Please enter two node connector strings");
856             return;
857         }
858         String ncStr2 = ci.nextArgument();
859         if (ncStr2 == null) {
860             ci.println("Please enter second node connector string");
861             return;
862         }
863
864         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
865         if (nc1 == null) {
866             ci.println("Invalid input node connector 1 string: " + ncStr1);
867             return;
868         }
869         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
870         if (nc2 == null) {
871             ci.println("Invalid input node connector 2 string: " + ncStr2);
872             return;
873         }
874
875         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
876         ci.println(this.addUserLink(config));
877     }
878
879     public void _deleteUserLink(CommandInterpreter ci) {
880         String name = ci.nextArgument();
881         if ((name == null)) {
882             ci.println("Please enter a valid Name");
883             return;
884         }
885         this.deleteUserLink(name);
886     }
887
888     public void _printNodeEdges(CommandInterpreter ci) {
889         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
890         if (nodeEdges == null) {
891             return;
892         }
893         Set<Node> nodeSet = nodeEdges.keySet();
894         if (nodeSet == null) {
895             return;
896         }
897         ci.println("        Node                                         Edge");
898         for (Node node : nodeSet) {
899             Set<Edge> edgeSet = nodeEdges.get(node);
900             if (edgeSet == null) {
901                 continue;
902             }
903             for (Edge edge : edgeSet) {
904                 ci.println(node + "             " + edge);
905             }
906         }
907     }
908
909     @Override
910     public Object readObject(ObjectInputStream ois)
911             throws FileNotFoundException, IOException, ClassNotFoundException {
912         return ois.readObject();
913     }
914
915     @Override
916     public Status saveConfiguration() {
917         return saveConfig();
918     }
919
920     @Override
921     public void edgeOverUtilized(Edge edge) {
922         log.warn("Link Utilization above normal: {}", edge);
923     }
924
925     @Override
926     public void edgeUtilBackToNormal(Edge edge) {
927         log.warn("Link Utilization back to normal: {}", edge);
928     }
929
930     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
931         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
932         upd.setLocal(isLocal);
933         notifyQ.add(upd);
934     }
935
936     @Override
937     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
938         if (cacheName.equals(TOPOEDGESDB)) {
939             // This is the case of an Edge being added to the topology DB
940             final Edge e = (Edge) key;
941             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
942             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
943         }
944     }
945
946     @Override
947     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
948         if (cacheName.equals(TOPOEDGESDB)) {
949             final Edge e = (Edge) key;
950             log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
951             final Set<Property> props = (Set<Property>) new_value;
952             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
953         }
954     }
955
956     @Override
957     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
958         if (cacheName.equals(TOPOEDGESDB)) {
959             final Edge e = (Edge) key;
960             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
961             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
962         }
963     }
964
965     class TopologyNotify implements Runnable {
966         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
967         private TopoEdgeUpdate entry;
968         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
969         private boolean notifyListeners;
970
971         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
972             this.notifyQ = notifyQ;
973         }
974
975         @Override
976         public void run() {
977             while (true) {
978                 try {
979                     log.trace("New run of TopologyNotify");
980                     notifyListeners = false;
981                     // First we block waiting for an element to get in
982                     entry = notifyQ.take();
983                     // Then we drain the whole queue if elements are
984                     // in it without getting into any blocking condition
985                     for (; entry != null; entry = notifyQ.poll()) {
986                         teuList.add(entry);
987                         notifyListeners = true;
988                     }
989
990                     // Notify listeners only if there were updates drained else
991                     // give up
992                     if (notifyListeners) {
993                         log.trace("Notifier thread, notified a listener");
994                         // Now update the listeners
995                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
996                             try {
997                                 s.edgeUpdate(teuList);
998                             } catch (Exception exc) {
999                                 log.error("Exception on edge update:", exc);
1000                             }
1001                         }
1002                     }
1003                     teuList.clear();
1004
1005                     // Lets sleep for sometime to allow aggregation of event
1006                     Thread.sleep(100);
1007                 } catch (InterruptedException e1) {
1008                     if (shuttingDown) {
1009                         return;
1010                     }
1011                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
1012                 } catch (Exception e2) {
1013                     log.error("", e2);
1014                 }
1015             }
1016         }
1017     }
1018 }