Move adsal into its own subdirectory.
[controller.git] / opendaylight / adsal / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import org.apache.commons.lang3.tuple.ImmutablePair;
12 import org.apache.felix.dm.Component;
13 import org.eclipse.osgi.framework.console.CommandInterpreter;
14 import org.eclipse.osgi.framework.console.CommandProvider;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.configuration.ConfigurationObject;
21 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
22 import org.opendaylight.controller.configuration.IConfigurationContainerService;
23 import org.opendaylight.controller.sal.core.Edge;
24 import org.opendaylight.controller.sal.core.Host;
25 import org.opendaylight.controller.sal.core.Node;
26 import org.opendaylight.controller.sal.core.NodeConnector;
27 import org.opendaylight.controller.sal.core.Property;
28 import org.opendaylight.controller.sal.core.TimeStamp;
29 import org.opendaylight.controller.sal.core.UpdateType;
30 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
31 import org.opendaylight.controller.sal.topology.ITopologyService;
32 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
33 import org.opendaylight.controller.sal.utils.IObjectReader;
34 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
35 import org.opendaylight.controller.sal.utils.Status;
36 import org.opendaylight.controller.sal.utils.StatusCode;
37 import org.opendaylight.controller.switchmanager.ISwitchManager;
38 import org.opendaylight.controller.topologymanager.ITopologyManager;
39 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
40 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
41 import org.opendaylight.controller.topologymanager.ITopologyManagerShell;
42 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
43 import org.osgi.framework.BundleContext;
44 import org.osgi.framework.FrameworkUtil;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import java.io.FileNotFoundException;
49 import java.io.IOException;
50 import java.io.ObjectInputStream;
51 import java.util.ArrayList;
52 import java.util.Dictionary;
53 import java.util.EnumSet;
54 import java.util.HashMap;
55 import java.util.HashSet;
56 import java.util.Iterator;
57 import java.util.LinkedList;
58 import java.util.List;
59 import java.util.Map;
60 import java.util.Set;
61 import java.util.concurrent.BlockingQueue;
62 import java.util.concurrent.ConcurrentHashMap;
63 import java.util.concurrent.ConcurrentMap;
64 import java.util.concurrent.CopyOnWriteArraySet;
65 import java.util.concurrent.LinkedBlockingQueue;
66
67 /**
68  * The class describes TopologyManager which is the central repository of the
69  * network topology. It provides service for applications to interact with
70  * topology database and notifies all the listeners of topology changes.
71  */
72 public class TopologyManagerImpl implements
73         ICacheUpdateAware<Object, Object>,
74         ITopologyManager,
75         ITopologyManagerShell,
76         IConfigurationContainerAware,
77         IListenTopoUpdates,
78         IObjectReader,
79         CommandProvider {
80     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
81     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
82     protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
83     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
84     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
85     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
86     private ITopologyService topoService;
87     private IClusterContainerServices clusterContainerService;
88     private IConfigurationContainerService configurationService;
89     private ISwitchManager switchManager;
90     // DB of all the Edges with properties which constitute our topology
91     private ConcurrentMap<Edge, Set<Property>> edgesDB;
92     // DB of all NodeConnector which are part of ISL Edges, meaning they
93     // are connected to another NodeConnector on the other side of an ISL link.
94     // NodeConnector of a Production Edge is not part of this DB.
95     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
96     // DB of all the NodeConnectors with an Host attached to it
97     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
98     // Topology Manager Aware listeners
99     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
100     // Topology Manager Aware listeners - for clusterwide updates
101     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
102             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
103     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
104     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
105     private volatile Boolean shuttingDown = false;
106     private Thread notifyThread;
107
108
109     void nonClusterObjectCreate() {
110         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
111         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
112         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
113         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
114     }
115
116     void setTopologyManagerAware(ITopologyManagerAware s) {
117         if (this.topologyManagerAware != null) {
118             log.debug("Adding ITopologyManagerAware: {}", s);
119             this.topologyManagerAware.add(s);
120         }
121     }
122
123     void unsetTopologyManagerAware(ITopologyManagerAware s) {
124         if (this.topologyManagerAware != null) {
125             log.debug("Removing ITopologyManagerAware: {}", s);
126             this.topologyManagerAware.remove(s);
127         }
128     }
129
130     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
131         if (this.topologyManagerClusterWideAware != null) {
132             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
133             this.topologyManagerClusterWideAware.add(s);
134         }
135     }
136
137     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
138         if (this.topologyManagerClusterWideAware != null) {
139             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
140             this.topologyManagerClusterWideAware.remove(s);
141         }
142     }
143
144     void setTopoService(ITopologyService s) {
145         log.debug("Adding ITopologyService: {}", s);
146         this.topoService = s;
147     }
148
149     void unsetTopoService(ITopologyService s) {
150         if (this.topoService == s) {
151             log.debug("Removing ITopologyService: {}", s);
152             this.topoService = null;
153         }
154     }
155
156     void setClusterContainerService(IClusterContainerServices s) {
157         log.debug("Cluster Service set");
158         this.clusterContainerService = s;
159     }
160
161     void unsetClusterContainerService(IClusterContainerServices s) {
162         if (this.clusterContainerService == s) {
163             log.debug("Cluster Service removed!");
164             this.clusterContainerService = null;
165         }
166     }
167
168     public void setConfigurationContainerService(IConfigurationContainerService service) {
169         log.trace("Got configuration service set request {}", service);
170         this.configurationService = service;
171     }
172
173     public void unsetConfigurationContainerService(IConfigurationContainerService service) {
174         log.trace("Got configuration service UNset request");
175         this.configurationService = null;
176     }
177
178     void setSwitchManager(ISwitchManager s) {
179         log.debug("Adding ISwitchManager: {}", s);
180         this.switchManager = s;
181     }
182
183     void unsetSwitchManager(ISwitchManager s) {
184         if (this.switchManager == s) {
185             log.debug("Removing ISwitchManager: {}", s);
186             this.switchManager = null;
187         }
188     }
189
190     /**
191      * Function called by the dependency manager when all the required
192      * dependencies are satisfied
193      *
194      */
195     void init(Component c) {
196         allocateCaches();
197         retrieveCaches();
198         String containerName = null;
199         Dictionary<?, ?> props = c.getServiceProperties();
200         if (props != null) {
201             containerName = (String) props.get("containerName");
202         } else {
203             // In the Global instance case the containerName is empty
204             containerName = "UNKNOWN";
205         }
206
207         registerWithOSGIConsole();
208         loadConfiguration();
209
210         // Restore the shuttingDown status on init of the component
211         shuttingDown = false;
212         notifyThread = new Thread(new TopologyNotify(notifyQ));
213     }
214
215     @SuppressWarnings({ "unchecked" })
216     private void allocateCaches() {
217             this.edgesDB =
218                     (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
219
220             this.hostsDB =
221                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
222
223             this.nodeConnectorsDB =
224                     (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
225                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
226             this.userLinksDB =
227                     (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
228                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
229     }
230
231     private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
232         ConcurrentMap<?, ?> cache = null;
233         try {
234             cache = this.clusterContainerService.createCache(cacheName, cacheModes);
235         } catch (CacheExistException e) {
236             log.debug(cacheName + " cache already exists - destroy and recreate if needed");
237         } catch (CacheConfigException e) {
238             log.error(cacheName + " cache configuration invalid - check cache mode");
239         }
240         return cache;
241     }
242
243     @SuppressWarnings({ "unchecked" })
244     private void retrieveCaches() {
245         if (this.clusterContainerService == null) {
246             log.error("Cluster Services is null, can't retrieve caches.");
247             return;
248         }
249
250         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
251         if (edgesDB == null) {
252             log.error("Failed to get cache for " + TOPOEDGESDB);
253         }
254
255         this.hostsDB =
256                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
257         if (hostsDB == null) {
258             log.error("Failed to get cache for " + TOPOHOSTSDB);
259         }
260
261         this.nodeConnectorsDB =
262                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
263         if (nodeConnectorsDB == null) {
264             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
265         }
266
267         this.userLinksDB =
268                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
269         if (userLinksDB == null) {
270             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
271         }
272     }
273
274     /**
275      * Function called after the topology manager has registered the service in
276      * OSGi service registry.
277      *
278      */
279     void started() {
280         // Start the batcher thread for the cluster wide topology updates
281         notifyThread.start();
282         // SollicitRefresh MUST be called here else if called at init
283         // time it may sollicit refresh too soon.
284         log.debug("Sollicit topology refresh");
285         topoService.sollicitRefresh();
286     }
287
288     void stop() {
289         shuttingDown = true;
290         notifyThread.interrupt();
291     }
292
293     /**
294      * Function called by the dependency manager when at least one dependency
295      * become unsatisfied or when the component is shutting down because for
296      * example bundle is being stopped.
297      *
298      */
299     void destroy() {
300         notifyQ.clear();
301         notifyThread = null;
302     }
303
304     private void loadConfiguration() {
305         for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) {
306             addUserLink((TopologyUserLinkConfig) conf);
307         }
308     }
309
310     @Override
311     public Status saveConfig() {
312         return saveConfigInternal();
313     }
314
315     public Status saveConfigInternal() {
316         Status saveStatus = configurationService.persistConfiguration(
317                 new ArrayList<ConfigurationObject>(userLinksDB.values()), USER_LINKS_FILE_NAME);
318
319         if (!saveStatus.isSuccess()) {
320             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
321         }
322         return saveStatus;
323     }
324
325     @Override
326     public Map<Node, Set<Edge>> getNodeEdges() {
327         if (this.edgesDB == null) {
328             return null;
329         }
330
331         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
332         for (Edge edge : this.edgesDB.keySet()) {
333             // Lets analyze the tail
334             Node node = edge.getTailNodeConnector().getNode();
335             Set<Edge> nodeEdges = res.get(node);
336             if (nodeEdges == null) {
337                 nodeEdges = new HashSet<Edge>();
338                 res.put(node, nodeEdges);
339             }
340             nodeEdges.add(edge);
341
342             // Lets analyze the head
343             node = edge.getHeadNodeConnector().getNode();
344             nodeEdges = res.get(node);
345             if (nodeEdges == null) {
346                 nodeEdges = new HashSet<Edge>();
347                 res.put(node, nodeEdges);
348             }
349             nodeEdges.add(edge);
350         }
351
352         return res;
353     }
354
355     @Override
356     public boolean isInternal(NodeConnector p) {
357         if (this.nodeConnectorsDB == null) {
358             return false;
359         }
360
361         // This is an internal NodeConnector if is connected to
362         // another Node i.e it's part of the nodeConnectorsDB
363         return (this.nodeConnectorsDB.get(p) != null);
364     }
365
366     /**
367      * This method returns true if the edge is an ISL link.
368      *
369      * @param e
370      *            The edge
371      * @return true if it is an ISL link
372      */
373     public boolean isISLink(Edge e) {
374         return (!isProductionLink(e));
375     }
376
377     /**
378      * This method returns true if the edge is a production link.
379      *
380      * @param e
381      *            The edge
382      * @return true if it is a production link
383      */
384     public boolean isProductionLink(Edge e) {
385         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
386                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
387     }
388
389     /**
390      * This method cross checks the determination of nodeConnector type by Discovery Service
391      * against the information in SwitchManager and updates it accordingly.
392      * @param e
393      *          The edge
394      */
395     private void crossCheckNodeConnectors(Edge e) {
396         NodeConnector nc;
397         if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
398             nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
399             if (nc != null) {
400                 e.setHeadNodeConnector(nc);
401             }
402         }
403         if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
404             nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
405             if (nc != null) {
406                 e.setTailNodeConnector(nc);
407             }
408         }
409     }
410
411     /**
412      * A NodeConnector may have been categorized as of type Production by Discovery Service.
413      * But at the time when this determination was made, only OF nodes were known to Discovery
414      * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
415      * so, then it returns a new NodeConnector with correct type.
416      *
417      * @param nc
418      *       NodeConnector as passed on in the edge
419      * @return
420      *       If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
421      *       with correct type, null otherwise
422      */
423
424     private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
425
426         for (Node node : switchManager.getNodes()) {
427             String nodeName = node.getNodeIDString();
428             log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
429                     nc.getNode().getNodeIDString());
430             if (nodeName.equals(nc.getNode().getNodeIDString())) {
431                 NodeConnector nodeConnector = NodeConnectorCreator
432                         .createNodeConnector(node.getType(), nc.getID(), node);
433                 return nodeConnector;
434             }
435         }
436         return null;
437     }
438
439     /**
440      * The Map returned is a copy of the current topology hence if the topology
441      * changes the copy doesn't
442      *
443      * @return A Map representing the current topology expressed as edges of the
444      *         network
445      */
446     @Override
447     public Map<Edge, Set<Property>> getEdges() {
448         if (this.edgesDB == null) {
449             return null;
450         }
451
452         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
453         Set<Property> props;
454         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
455             // Sets of props are copied because the composition of
456             // those properties could change with time
457             props = new HashSet<Property>(edgeEntry.getValue());
458             // We can simply reuse the key because the object is
459             // immutable so doesn't really matter that we are
460             // referencing the only owned by a different table, the
461             // meaning is the same because doesn't change with time.
462             edgeMap.put(edgeEntry.getKey(), props);
463         }
464
465         return edgeMap;
466     }
467
468     @Override
469     public Set<NodeConnector> getNodeConnectorWithHost() {
470         if (this.hostsDB == null) {
471             return null;
472         }
473
474         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
475     }
476
477     @Override
478     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
479         if (this.hostsDB == null) {
480             return null;
481         }
482         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
483         Node node;
484         Set<NodeConnector> portSet;
485         for (NodeConnector nc : this.hostsDB.keySet()) {
486             node = nc.getNode();
487             portSet = res.get(node);
488             if (portSet == null) {
489                 // Create the HashSet if null
490                 portSet = new HashSet<NodeConnector>();
491                 res.put(node, portSet);
492             }
493
494             // Keep updating the HashSet, given this is not a
495             // clustered map we can just update the set without
496             // worrying to update the hashmap.
497             portSet.add(nc);
498         }
499
500         return (res);
501     }
502
503     @Override
504     public Host getHostAttachedToNodeConnector(NodeConnector port) {
505         List<Host> hosts = getHostsAttachedToNodeConnector(port);
506         if(hosts != null && !hosts.isEmpty()){
507             return hosts.get(0);
508         }
509         return null;
510     }
511
512     @Override
513     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
514         Set<ImmutablePair<Host, Set<Property>>> hosts;
515         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
516             return null;
517         }
518         // create a list of hosts
519         List<Host> retHosts = new LinkedList<Host>();
520         for(ImmutablePair<Host, Set<Property>> host : hosts) {
521             retHosts.add(host.getLeft());
522         }
523         return retHosts;
524     }
525
526     @Override
527     public synchronized void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
528
529         // Clone the property set in case non null else just
530         // create an empty one. Caches allocated via infinispan
531         // don't allow null values
532         if (props == null) {
533             props = new HashSet<Property>();
534         } else {
535             props = new HashSet<Property>(props);
536         }
537         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
538
539         // get the host list
540         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
541         if(hostSet == null) {
542             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
543         }
544         switch (t) {
545         case ADDED:
546         case CHANGED:
547             hostSet.add(thisHost);
548             this.hostsDB.put(port, hostSet);
549             break;
550         case REMOVED:
551             hostSet.remove(thisHost);
552             if(hostSet.isEmpty()) {
553                 //remove only if hasn't been concurrently modified
554                 this.hostsDB.remove(port, hostSet);
555             } else {
556                 this.hostsDB.put(port, hostSet);
557             }
558             break;
559         }
560     }
561
562     private boolean headNodeConnectorExist(Edge e) {
563         /*
564          * Only check the head end point which is supposed to be part of a
565          * network node we control (present in our inventory). If we checked the
566          * tail end point as well, we would not store the edges that connect to
567          * a non sdn enable port on a non sdn capable production switch. We want
568          * to be able to see these switches on the topology.
569          */
570         NodeConnector head = e.getHeadNodeConnector();
571         return (switchManager.doesNodeConnectorExist(head));
572     }
573
574     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
575         switch (type) {
576         case ADDED:
577
578
579             if (this.edgesDB.containsKey(e)) {
580                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
581                 log.trace("Skipping redundant edge addition: {}", e);
582                 return null;
583             }
584
585             // Make sure the props are non-null or create a copy
586             if (props == null) {
587                 props = new HashSet<Property>();
588             } else {
589                 props = new HashSet<Property>(props);
590             }
591
592
593             // Ensure that head node connector exists
594             if (!headNodeConnectorExist(e)) {
595                 log.warn("Ignore edge that contains invalid node connector: {}", e);
596                 return null;
597             }
598
599             // Check if nodeConnectors of the edge were correctly categorized
600             // by protocol plugin
601             crossCheckNodeConnectors(e);
602
603             // Now make sure there is the creation timestamp for the
604             // edge, if not there, stamp with the first update
605             boolean found_create = false;
606             for (Property prop : props) {
607                 if (prop instanceof TimeStamp) {
608                     TimeStamp t = (TimeStamp) prop;
609                     if (t.getTimeStampName().equals("creation")) {
610                         found_create = true;
611                         break;
612                     }
613                 }
614             }
615
616             if (!found_create) {
617                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
618                 props.add(t);
619             }
620
621             // Now add this in the database eventually overriding
622             // something that may have been already existing
623             this.edgesDB.put(e, props);
624
625             // Now populate the DB of NodeConnectors
626             // NOTE WELL: properties are empty sets, not really needed
627             // for now.
628             // The DB only contains ISL ports
629             if (isISLink(e)) {
630                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
631                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
632             }
633             log.trace("Edge {}  {}", e.toString(), type.name());
634             break;
635         case REMOVED:
636             // Now remove the edge from edgesDB
637             this.edgesDB.remove(e);
638
639             // Now lets update the NodeConnectors DB, the assumption
640             // here is that two NodeConnector are exclusively
641             // connected by 1 and only 1 edge, this is reasonable in
642             // the same plug (virtual of phisical) we can assume two
643             // cables won't be plugged. This could break only in case
644             // of devices in the middle that acts as hubs, but it
645             // should be safe to assume that won't happen.
646             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
647             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
648             log.trace("Edge {}  {}", e.toString(), type.name());
649             break;
650         case CHANGED:
651             Set<Property> oldProps = this.edgesDB.get(e);
652
653             // When property(s) changes lets make sure we can change it
654             // all except the creation time stamp because that should
655             // be set only when the edge is created
656             TimeStamp timeStamp = null;
657             if (oldProps != null) {
658                 for (Property prop : oldProps) {
659                     if (prop instanceof TimeStamp) {
660                         TimeStamp tsProp = (TimeStamp) prop;
661                         if (tsProp.getTimeStampName().equals("creation")) {
662                             timeStamp = tsProp;
663                             break;
664                         }
665                     }
666                 }
667             }
668
669             // Now lets make sure new properties are non-null
670             if (props == null) {
671                 props = new HashSet<Property>();
672             } else {
673                 // Copy the set so noone is going to change the content
674                 props = new HashSet<Property>(props);
675             }
676
677             // Now lets remove the creation property if exist in the
678             // new props
679             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
680                 Property prop = i.next();
681                 if (prop instanceof TimeStamp) {
682                     TimeStamp t = (TimeStamp) prop;
683                     if (t.getTimeStampName().equals("creation")) {
684                         if (timeStamp != null) {
685                             i.remove();
686                         }
687                         break;
688                     }
689                 }
690             }
691
692             // Now lets add the creation timestamp in it
693             if (timeStamp != null) {
694                 props.add(timeStamp);
695             }
696
697             // Finally update
698             this.edgesDB.put(e, props);
699             log.trace("Edge {}  {}", e.toString(), type.name());
700             break;
701         }
702         return new TopoEdgeUpdate(e, props, type);
703     }
704
705     @Override
706     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
707         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
708         for (int i = 0; i < topoedgeupdateList.size(); i++) {
709             Edge e = topoedgeupdateList.get(i).getEdge();
710             Set<Property> p = topoedgeupdateList.get(i).getProperty();
711             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
712             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
713             if (teu != null) {
714                 teuList.add(teu);
715             }
716         }
717
718         if (!teuList.isEmpty()) {
719             // Now update the listeners
720             for (ITopologyManagerAware s : this.topologyManagerAware) {
721                 try {
722                     s.edgeUpdate(teuList);
723                 } catch (Exception exc) {
724                     log.error("Exception on edge update:", exc);
725                 }
726             }
727         }
728     }
729
730     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
731         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
732                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
733         return getLinkTuple(rLink);
734     }
735
736
737     private Edge getLinkTuple(TopologyUserLinkConfig link) {
738         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
739         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
740         try {
741             return new Edge(srcNodeConnector, dstNodeConnector);
742         } catch (Exception e) {
743             return null;
744         }
745     }
746
747     @Override
748     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
749         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
750     }
751
752     @Override
753     public Status addUserLink(TopologyUserLinkConfig userLink) {
754         if (!userLink.isValid()) {
755             return new Status(StatusCode.BADREQUEST,
756                     "User link configuration invalid.");
757         }
758         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
759
760         //Check if this link already configured
761         //NOTE: infinispan cache doesn't support Map.containsValue()
762         // (which is linear time in most ConcurrentMap impl anyway)
763         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
764             if (existingLink.equals(userLink)) {
765                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
766             }
767         }
768         //attempt put, if mapping for this key already existed return conflict
769         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
770             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
771                     + " already exists. Please use another name");
772         }
773
774         Edge linkTuple = getLinkTuple(userLink);
775         if (linkTuple != null) {
776             if (!isProductionLink(linkTuple)) {
777                 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
778                                                 new HashSet<Property>());
779                 if (teu == null) {
780                     userLinksDB.remove(userLink.getName());
781                     return new Status(StatusCode.NOTFOUND,
782                            "Link configuration contains invalid node connector: "
783                            + userLink);
784                 }
785             }
786
787             linkTuple = getReverseLinkTuple(userLink);
788             if (linkTuple != null) {
789                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
790                 if (!isProductionLink(linkTuple)) {
791                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
792                 }
793             }
794         }
795         return new Status(StatusCode.SUCCESS);
796     }
797
798     @Override
799     public Status deleteUserLink(String linkName) {
800         if (linkName == null) {
801             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
802         }
803
804         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
805         Edge linkTuple;
806         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
807             if (! isProductionLink(linkTuple)) {
808                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
809             }
810
811             linkTuple = getReverseLinkTuple(link);
812             if (! isProductionLink(linkTuple)) {
813                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
814             }
815         }
816         return new Status(StatusCode.SUCCESS);
817     }
818
819     private void registerWithOSGIConsole() {
820         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
821                 .getBundleContext();
822         bundleContext.registerService(CommandProvider.class.getName(), this,
823                 null);
824     }
825
826     @Override
827     public String getHelp() {
828         StringBuffer help = new StringBuffer();
829         help.append("---Topology Manager---\n");
830         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
831         help.append("\t deleteUserLink <name>\n");
832         help.append("\t printUserLink\n");
833         help.append("\t printNodeEdges\n");
834         return help.toString();
835     }
836
837     public void _printUserLink(CommandInterpreter ci) {
838         for (String name : this.userLinksDB.keySet()) {
839             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
840             ci.println("Name : " + name);
841             ci.println(linkConfig);
842             ci.println("Edge " + getLinkTuple(linkConfig));
843             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
844         }
845     }
846
847     public void _addUserLink(CommandInterpreter ci) {
848         String name = ci.nextArgument();
849         if ((name == null)) {
850             ci.println("Please enter a valid Name");
851             return;
852         }
853
854         String ncStr1 = ci.nextArgument();
855         if (ncStr1 == null) {
856             ci.println("Please enter two node connector strings");
857             return;
858         }
859         String ncStr2 = ci.nextArgument();
860         if (ncStr2 == null) {
861             ci.println("Please enter second node connector string");
862             return;
863         }
864
865         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
866         if (nc1 == null) {
867             ci.println("Invalid input node connector 1 string: " + ncStr1);
868             return;
869         }
870         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
871         if (nc2 == null) {
872             ci.println("Invalid input node connector 2 string: " + ncStr2);
873             return;
874         }
875
876         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
877         ci.println(this.addUserLink(config));
878     }
879
880     public void _deleteUserLink(CommandInterpreter ci) {
881         String name = ci.nextArgument();
882         if ((name == null)) {
883             ci.println("Please enter a valid Name");
884             return;
885         }
886         this.deleteUserLink(name);
887     }
888
889     public void _printNodeEdges(CommandInterpreter ci) {
890         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
891         if (nodeEdges == null) {
892             return;
893         }
894         Set<Node> nodeSet = nodeEdges.keySet();
895         if (nodeSet == null) {
896             return;
897         }
898         ci.println("        Node                                         Edge");
899         for (Node node : nodeSet) {
900             Set<Edge> edgeSet = nodeEdges.get(node);
901             if (edgeSet == null) {
902                 continue;
903             }
904             for (Edge edge : edgeSet) {
905                 ci.println(node + "             " + edge);
906             }
907         }
908     }
909
910     @Override
911     public Object readObject(ObjectInputStream ois)
912             throws FileNotFoundException, IOException, ClassNotFoundException {
913         return ois.readObject();
914     }
915
916     @Override
917     public Status saveConfiguration() {
918         return saveConfig();
919     }
920
921     @Override
922     public void edgeOverUtilized(Edge edge) {
923         log.warn("Link Utilization above normal: {}", edge);
924     }
925
926     @Override
927     public void edgeUtilBackToNormal(Edge edge) {
928         log.warn("Link Utilization back to normal: {}", edge);
929     }
930
931     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
932         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
933         upd.setLocal(isLocal);
934         notifyQ.add(upd);
935     }
936
937     @Override
938     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
939         if (cacheName.equals(TOPOEDGESDB)) {
940             // This is the case of an Edge being added to the topology DB
941             final Edge e = (Edge) key;
942             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
943             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
944         }
945     }
946
947     @Override
948     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
949         if (cacheName.equals(TOPOEDGESDB)) {
950             final Edge e = (Edge) key;
951             log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
952             final Set<Property> props = (Set<Property>) new_value;
953             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
954         }
955     }
956
957     @Override
958     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
959         if (cacheName.equals(TOPOEDGESDB)) {
960             final Edge e = (Edge) key;
961             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
962             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
963         }
964     }
965
966     class TopologyNotify implements Runnable {
967         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
968         private TopoEdgeUpdate entry;
969         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
970         private boolean notifyListeners;
971
972         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
973             this.notifyQ = notifyQ;
974         }
975
976         @Override
977         public void run() {
978             while (true) {
979                 try {
980                     log.trace("New run of TopologyNotify");
981                     notifyListeners = false;
982                     // First we block waiting for an element to get in
983                     entry = notifyQ.take();
984                     // Then we drain the whole queue if elements are
985                     // in it without getting into any blocking condition
986                     for (; entry != null; entry = notifyQ.poll()) {
987                         teuList.add(entry);
988                         notifyListeners = true;
989                     }
990
991                     // Notify listeners only if there were updates drained else
992                     // give up
993                     if (notifyListeners) {
994                         log.trace("Notifier thread, notified a listener");
995                         // Now update the listeners
996                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
997                             try {
998                                 s.edgeUpdate(teuList);
999                             } catch (Exception exc) {
1000                                 log.error("Exception on edge update:", exc);
1001                             }
1002                         }
1003                     }
1004                     teuList.clear();
1005
1006                     // Lets sleep for sometime to allow aggregation of event
1007                     Thread.sleep(100);
1008                 } catch (InterruptedException e1) {
1009                     if (shuttingDown) {
1010                         return;
1011                     }
1012                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
1013                 } catch (Exception e2) {
1014                     log.error("", e2);
1015                 }
1016             }
1017         }
1018     }
1019
1020     public List<String> printUserLink() {
1021         List<String> result = new ArrayList<String>();
1022         for (String name : this.userLinksDB.keySet()) {
1023             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
1024             result.add("Name : " + name);
1025             result.add(linkConfig.toString());
1026             result.add("Edge " + getLinkTuple(linkConfig));
1027             result.add("Reverse Edge " + getReverseLinkTuple(linkConfig));
1028         }
1029         return result;
1030     }
1031
1032     public List<String> addUserLink(String name, String ncStr1, String ncStr2) {
1033         List<String> result = new ArrayList<String>();
1034         if ((name == null)) {
1035             result.add("Please enter a valid Name");
1036             return result;
1037         }
1038
1039         if (ncStr1 == null) {
1040             result.add("Please enter two node connector strings");
1041             return result;
1042         }
1043         if (ncStr2 == null) {
1044             result.add("Please enter second node connector string");
1045             return result;
1046         }
1047
1048         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
1049         if (nc1 == null) {
1050             result.add("Invalid input node connector 1 string: " + ncStr1);
1051             return result;
1052         }
1053         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
1054         if (nc2 == null) {
1055             result.add("Invalid input node connector 2 string: " + ncStr2);
1056             return result;
1057         }
1058
1059         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
1060         result.add(this.addUserLink(config).toString());
1061         return result;
1062     }
1063
1064     public List<String> deleteUserLinkShell(String name) {
1065         List<String> result = new ArrayList<String>();
1066         if ((name == null)) {
1067             result.add("Please enter a valid Name");
1068             return result;
1069         }
1070         this.deleteUserLink(name);
1071         return result;
1072     }
1073
1074     public List<String> printNodeEdges() {
1075         List<String> result = new ArrayList<String>();
1076         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
1077         if (nodeEdges == null) {
1078             return result;
1079         }
1080         Set<Node> nodeSet = nodeEdges.keySet();
1081         if (nodeSet == null) {
1082             return result;
1083         }
1084         result.add("        Node                                         Edge");
1085         for (Node node : nodeSet) {
1086             Set<Edge> edgeSet = nodeEdges.get(node);
1087             if (edgeSet == null) {
1088                 continue;
1089             }
1090             for (Edge edge : edgeSet) {
1091                 result.add(node + "             " + edge);
1092             }
1093         }
1094         return result;
1095     }
1096
1097 }