Merge "Fix test case mis-spelling."
[controller.git] / opendaylight / adsal / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import org.apache.commons.lang3.tuple.ImmutablePair;
12 import org.apache.felix.dm.Component;
13 import org.eclipse.osgi.framework.console.CommandInterpreter;
14 import org.eclipse.osgi.framework.console.CommandProvider;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.configuration.ConfigurationObject;
21 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
22 import org.opendaylight.controller.configuration.IConfigurationContainerService;
23 import org.opendaylight.controller.sal.core.Edge;
24 import org.opendaylight.controller.sal.core.Host;
25 import org.opendaylight.controller.sal.core.Node;
26 import org.opendaylight.controller.sal.core.NodeConnector;
27 import org.opendaylight.controller.sal.core.Property;
28 import org.opendaylight.controller.sal.core.TimeStamp;
29 import org.opendaylight.controller.sal.core.UpdateType;
30 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
31 import org.opendaylight.controller.sal.topology.ITopologyService;
32 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
33 import org.opendaylight.controller.sal.utils.IObjectReader;
34 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
35 import org.opendaylight.controller.sal.utils.Status;
36 import org.opendaylight.controller.sal.utils.StatusCode;
37 import org.opendaylight.controller.switchmanager.IInventoryListener;
38 import org.opendaylight.controller.switchmanager.ISwitchManager;
39 import org.opendaylight.controller.topologymanager.ITopologyManager;
40 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
41 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
42 import org.opendaylight.controller.topologymanager.ITopologyManagerShell;
43 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
44 import org.osgi.framework.BundleContext;
45 import org.osgi.framework.FrameworkUtil;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import java.io.FileNotFoundException;
50 import java.io.IOException;
51 import java.io.ObjectInputStream;
52 import java.util.ArrayList;
53 import java.util.Dictionary;
54 import java.util.EnumSet;
55 import java.util.HashMap;
56 import java.util.HashSet;
57 import java.util.Iterator;
58 import java.util.LinkedList;
59 import java.util.List;
60 import java.util.Map;
61 import java.util.Set;
62 import java.util.Timer;
63 import java.util.TimerTask;
64 import java.util.concurrent.BlockingQueue;
65 import java.util.concurrent.ConcurrentHashMap;
66 import java.util.concurrent.ConcurrentMap;
67 import java.util.concurrent.CopyOnWriteArraySet;
68 import java.util.concurrent.LinkedBlockingQueue;
69
70 /**
71  * The class describes TopologyManager which is the central repository of the
72  * network topology. It provides service for applications to interact with
73  * topology database and notifies all the listeners of topology changes.
74  */
75 public class TopologyManagerImpl implements
76         ICacheUpdateAware<Object, Object>,
77         ITopologyManager,
78         ITopologyManagerShell,
79         IConfigurationContainerAware,
80         IListenTopoUpdates,
81         IObjectReader,
82         IInventoryListener,
83         CommandProvider {
84     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
85     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
86     protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
87     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
88     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
89     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
90     private static final long PENDING_UPDATE_TIMEOUT = 5000L;
91
92     private ITopologyService topoService;
93     private IClusterContainerServices clusterContainerService;
94     private IConfigurationContainerService configurationService;
95     private ISwitchManager switchManager;
96     // DB of all the Edges with properties which constitute our topology
97     private ConcurrentMap<Edge, Set<Property>> edgesDB;
98     // DB of all NodeConnector which are part of ISL Edges, meaning they
99     // are connected to another NodeConnector on the other side of an ISL link.
100     // NodeConnector of a Production Edge is not part of this DB.
101     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
102     // DB of all the NodeConnectors with an Host attached to it
103     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
104     // Topology Manager Aware listeners
105     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
106     // Topology Manager Aware listeners - for clusterwide updates
107     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
108             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
109     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
110     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
111     private volatile Boolean shuttingDown = false;
112     private Thread notifyThread;
113     private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
114         new HashMap<NodeConnector, List<PendingUpdateTask>>();
115     private final BlockingQueue<TopoEdgeUpdate> updateQ =
116         new LinkedBlockingQueue<TopoEdgeUpdate>();
117     private Timer pendingTimer;
118     private Thread updateThread;
119
120     private class PendingEdgeUpdate extends TopoEdgeUpdate {
121         private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
122             super(e, p, t);
123         }
124     }
125
126     private class UpdateTopology implements Runnable {
127         @Override
128         public void run() {
129             log.trace("Start topology update thread");
130
131             while (!shuttingDown) {
132                 try {
133                     List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
134                     TopoEdgeUpdate teu = updateQ.take();
135                     for (; teu != null; teu = updateQ.poll()) {
136                         list.add(teu);
137                     }
138
139                     if (!list.isEmpty()) {
140                         log.trace("Update edges: {}", list);
141                         doEdgeUpdate(list);
142                     }
143                 } catch (InterruptedException e) {
144                     if (shuttingDown) {
145                         break;
146                     }
147                     log.warn("Topology update thread interrupted", e);
148                 } catch (Exception e) {
149                     log.error("Exception on topology update thread", e);
150                 }
151             }
152
153             log.trace("Exit topology update thread");
154         }
155     }
156
157     private class PendingUpdateTask extends TimerTask {
158         private final Edge  edge;
159         private final Set<Property>  props;
160         private final UpdateType  type;
161
162         private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
163             edge = e;
164             props = p;
165             type = t;
166         }
167
168         private NodeConnector getHeadNodeConnector() {
169             return edge.getHeadNodeConnector();
170         }
171
172         private void flush() {
173             log.info("Flush pending topology update: edge {}, type {}",
174                      edge, type);
175             updateQ.add(new PendingEdgeUpdate(edge, props, type));
176         }
177
178         @Override
179         public void run() {
180             if (removePendingEvent(this)) {
181                 log.warn("Pending topology update timed out: edge{}, type {}",
182                          edge, type);
183             }
184         }
185     }
186
187     void nonClusterObjectCreate() {
188         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
189         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
190         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
191         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
192     }
193
194     void setTopologyManagerAware(ITopologyManagerAware s) {
195         if (this.topologyManagerAware != null) {
196             log.debug("Adding ITopologyManagerAware: {}", s);
197             this.topologyManagerAware.add(s);
198         }
199     }
200
201     void unsetTopologyManagerAware(ITopologyManagerAware s) {
202         if (this.topologyManagerAware != null) {
203             log.debug("Removing ITopologyManagerAware: {}", s);
204             this.topologyManagerAware.remove(s);
205         }
206     }
207
208     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
209         if (this.topologyManagerClusterWideAware != null) {
210             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
211             this.topologyManagerClusterWideAware.add(s);
212         }
213     }
214
215     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
216         if (this.topologyManagerClusterWideAware != null) {
217             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
218             this.topologyManagerClusterWideAware.remove(s);
219         }
220     }
221
222     void setTopoService(ITopologyService s) {
223         log.debug("Adding ITopologyService: {}", s);
224         this.topoService = s;
225     }
226
227     void unsetTopoService(ITopologyService s) {
228         if (this.topoService == s) {
229             log.debug("Removing ITopologyService: {}", s);
230             this.topoService = null;
231         }
232     }
233
234     void setClusterContainerService(IClusterContainerServices s) {
235         log.debug("Cluster Service set");
236         this.clusterContainerService = s;
237     }
238
239     void unsetClusterContainerService(IClusterContainerServices s) {
240         if (this.clusterContainerService == s) {
241             log.debug("Cluster Service removed!");
242             this.clusterContainerService = null;
243         }
244     }
245
246     public void setConfigurationContainerService(IConfigurationContainerService service) {
247         log.trace("Got configuration service set request {}", service);
248         this.configurationService = service;
249     }
250
251     public void unsetConfigurationContainerService(IConfigurationContainerService service) {
252         log.trace("Got configuration service UNset request");
253         this.configurationService = null;
254     }
255
256     void setSwitchManager(ISwitchManager s) {
257         log.debug("Adding ISwitchManager: {}", s);
258         this.switchManager = s;
259     }
260
261     void unsetSwitchManager(ISwitchManager s) {
262         if (this.switchManager == s) {
263             log.debug("Removing ISwitchManager: {}", s);
264             this.switchManager = null;
265         }
266     }
267
268     /**
269      * Function called by the dependency manager when all the required
270      * dependencies are satisfied
271      *
272      */
273     void init(Component c) {
274         allocateCaches();
275         retrieveCaches();
276         String containerName = null;
277         Dictionary<?, ?> props = c.getServiceProperties();
278         if (props != null) {
279             containerName = (String) props.get("containerName");
280         } else {
281             // In the Global instance case the containerName is empty
282             containerName = "UNKNOWN";
283         }
284
285         registerWithOSGIConsole();
286         loadConfiguration();
287
288         // Restore the shuttingDown status on init of the component
289         shuttingDown = false;
290         notifyThread = new Thread(new TopologyNotify(notifyQ));
291         pendingTimer = new Timer("Topology Pending Update Timer");
292         updateThread = new Thread(new UpdateTopology(), "Topology Update");
293     }
294
295     @SuppressWarnings({ "unchecked" })
296     private void allocateCaches() {
297             this.edgesDB =
298                     (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
299
300             this.hostsDB =
301                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
302
303             this.nodeConnectorsDB =
304                     (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
305                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
306             this.userLinksDB =
307                     (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
308                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
309     }
310
311     private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
312         ConcurrentMap<?, ?> cache = null;
313         try {
314             cache = this.clusterContainerService.createCache(cacheName, cacheModes);
315         } catch (CacheExistException e) {
316             log.debug(cacheName + " cache already exists - destroy and recreate if needed");
317         } catch (CacheConfigException e) {
318             log.error(cacheName + " cache configuration invalid - check cache mode");
319         }
320         return cache;
321     }
322
323     @SuppressWarnings({ "unchecked" })
324     private void retrieveCaches() {
325         if (this.clusterContainerService == null) {
326             log.error("Cluster Services is null, can't retrieve caches.");
327             return;
328         }
329
330         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
331         if (edgesDB == null) {
332             log.error("Failed to get cache for " + TOPOEDGESDB);
333         }
334
335         this.hostsDB =
336                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
337         if (hostsDB == null) {
338             log.error("Failed to get cache for " + TOPOHOSTSDB);
339         }
340
341         this.nodeConnectorsDB =
342                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
343         if (nodeConnectorsDB == null) {
344             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
345         }
346
347         this.userLinksDB =
348                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
349         if (userLinksDB == null) {
350             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
351         }
352     }
353
354     /**
355      * Function called after the topology manager has registered the service in
356      * OSGi service registry.
357      *
358      */
359     void started() {
360         updateThread.start();
361
362         // Start the batcher thread for the cluster wide topology updates
363         notifyThread.start();
364         // SollicitRefresh MUST be called here else if called at init
365         // time it may sollicit refresh too soon.
366         log.debug("Sollicit topology refresh");
367         topoService.sollicitRefresh();
368     }
369
370     void stop() {
371         shuttingDown = true;
372         updateThread.interrupt();
373         notifyThread.interrupt();
374         pendingTimer.cancel();
375     }
376
377     /**
378      * Function called by the dependency manager when at least one dependency
379      * become unsatisfied or when the component is shutting down because for
380      * example bundle is being stopped.
381      *
382      */
383     void destroy() {
384         updateQ.clear();
385         updateThread = null;
386         pendingTimer = null;
387         notifyQ.clear();
388         notifyThread = null;
389     }
390
391     private void loadConfiguration() {
392         for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) {
393             addUserLink((TopologyUserLinkConfig) conf);
394         }
395     }
396
397     @Override
398     public Status saveConfig() {
399         return saveConfigInternal();
400     }
401
402     public Status saveConfigInternal() {
403         Status saveStatus = configurationService.persistConfiguration(
404                 new ArrayList<ConfigurationObject>(userLinksDB.values()), USER_LINKS_FILE_NAME);
405
406         if (!saveStatus.isSuccess()) {
407             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
408         }
409         return saveStatus;
410     }
411
412     @Override
413     public Map<Node, Set<Edge>> getNodeEdges() {
414         if (this.edgesDB == null) {
415             return null;
416         }
417
418         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
419         for (Edge edge : this.edgesDB.keySet()) {
420             // Lets analyze the tail
421             Node node = edge.getTailNodeConnector().getNode();
422             Set<Edge> nodeEdges = res.get(node);
423             if (nodeEdges == null) {
424                 nodeEdges = new HashSet<Edge>();
425                 res.put(node, nodeEdges);
426             }
427             nodeEdges.add(edge);
428
429             // Lets analyze the head
430             node = edge.getHeadNodeConnector().getNode();
431             nodeEdges = res.get(node);
432             if (nodeEdges == null) {
433                 nodeEdges = new HashSet<Edge>();
434                 res.put(node, nodeEdges);
435             }
436             nodeEdges.add(edge);
437         }
438
439         return res;
440     }
441
442     @Override
443     public boolean isInternal(NodeConnector p) {
444         if (this.nodeConnectorsDB == null) {
445             return false;
446         }
447
448         // This is an internal NodeConnector if is connected to
449         // another Node i.e it's part of the nodeConnectorsDB
450         return (this.nodeConnectorsDB.get(p) != null);
451     }
452
453     /**
454      * This method returns true if the edge is an ISL link.
455      *
456      * @param e
457      *            The edge
458      * @return true if it is an ISL link
459      */
460     public boolean isISLink(Edge e) {
461         return (!isProductionLink(e));
462     }
463
464     /**
465      * This method returns true if the edge is a production link.
466      *
467      * @param e
468      *            The edge
469      * @return true if it is a production link
470      */
471     public boolean isProductionLink(Edge e) {
472         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
473                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
474     }
475
476     /**
477      * This method cross checks the determination of nodeConnector type by Discovery Service
478      * against the information in SwitchManager and updates it accordingly.
479      * @param e
480      *          The edge
481      */
482     private void crossCheckNodeConnectors(Edge e) {
483         NodeConnector nc;
484         if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
485             nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
486             if (nc != null) {
487                 e.setHeadNodeConnector(nc);
488             }
489         }
490         if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
491             nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
492             if (nc != null) {
493                 e.setTailNodeConnector(nc);
494             }
495         }
496     }
497
498     /**
499      * A NodeConnector may have been categorized as of type Production by Discovery Service.
500      * But at the time when this determination was made, only OF nodes were known to Discovery
501      * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
502      * so, then it returns a new NodeConnector with correct type.
503      *
504      * @param nc
505      *       NodeConnector as passed on in the edge
506      * @return
507      *       If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
508      *       with correct type, null otherwise
509      */
510
511     private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
512
513         for (Node node : switchManager.getNodes()) {
514             String nodeName = node.getNodeIDString();
515             log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
516                     nc.getNode().getNodeIDString());
517             if (nodeName.equals(nc.getNode().getNodeIDString())) {
518                 NodeConnector nodeConnector = NodeConnectorCreator
519                         .createNodeConnector(node.getType(), nc.getID(), node);
520                 return nodeConnector;
521             }
522         }
523         return null;
524     }
525
526     /**
527      * The Map returned is a copy of the current topology hence if the topology
528      * changes the copy doesn't
529      *
530      * @return A Map representing the current topology expressed as edges of the
531      *         network
532      */
533     @Override
534     public Map<Edge, Set<Property>> getEdges() {
535         if (this.edgesDB == null) {
536             return null;
537         }
538
539         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
540         Set<Property> props;
541         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
542             // Sets of props are copied because the composition of
543             // those properties could change with time
544             props = new HashSet<Property>(edgeEntry.getValue());
545             // We can simply reuse the key because the object is
546             // immutable so doesn't really matter that we are
547             // referencing the only owned by a different table, the
548             // meaning is the same because doesn't change with time.
549             edgeMap.put(edgeEntry.getKey(), props);
550         }
551
552         return edgeMap;
553     }
554
555     @Override
556     public Set<NodeConnector> getNodeConnectorWithHost() {
557         if (this.hostsDB == null) {
558             return null;
559         }
560
561         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
562     }
563
564     @Override
565     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
566         if (this.hostsDB == null) {
567             return null;
568         }
569         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
570         Node node;
571         Set<NodeConnector> portSet;
572         for (NodeConnector nc : this.hostsDB.keySet()) {
573             node = nc.getNode();
574             portSet = res.get(node);
575             if (portSet == null) {
576                 // Create the HashSet if null
577                 portSet = new HashSet<NodeConnector>();
578                 res.put(node, portSet);
579             }
580
581             // Keep updating the HashSet, given this is not a
582             // clustered map we can just update the set without
583             // worrying to update the hashmap.
584             portSet.add(nc);
585         }
586
587         return (res);
588     }
589
590     @Override
591     public Host getHostAttachedToNodeConnector(NodeConnector port) {
592         List<Host> hosts = getHostsAttachedToNodeConnector(port);
593         if(hosts != null && !hosts.isEmpty()){
594             return hosts.get(0);
595         }
596         return null;
597     }
598
599     @Override
600     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
601         Set<ImmutablePair<Host, Set<Property>>> hosts;
602         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
603             return null;
604         }
605         // create a list of hosts
606         List<Host> retHosts = new LinkedList<Host>();
607         for(ImmutablePair<Host, Set<Property>> host : hosts) {
608             retHosts.add(host.getLeft());
609         }
610         return retHosts;
611     }
612
613     @Override
614     public synchronized void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
615
616         // Clone the property set in case non null else just
617         // create an empty one. Caches allocated via infinispan
618         // don't allow null values
619         if (props == null) {
620             props = new HashSet<Property>();
621         } else {
622             props = new HashSet<Property>(props);
623         }
624         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
625
626         // get the host list
627         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
628         if(hostSet == null) {
629             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
630         }
631         switch (t) {
632         case ADDED:
633         case CHANGED:
634             hostSet.add(thisHost);
635             this.hostsDB.put(port, hostSet);
636             break;
637         case REMOVED:
638             hostSet.remove(thisHost);
639             if(hostSet.isEmpty()) {
640                 //remove only if hasn't been concurrently modified
641                 this.hostsDB.remove(port, hostSet);
642             } else {
643                 this.hostsDB.put(port, hostSet);
644             }
645             break;
646         }
647     }
648
649     private boolean headNodeConnectorExist(Edge e) {
650         /*
651          * Only check the head end point which is supposed to be part of a
652          * network node we control (present in our inventory). If we checked the
653          * tail end point as well, we would not store the edges that connect to
654          * a non sdn enable port on a non sdn capable production switch. We want
655          * to be able to see these switches on the topology.
656          */
657         NodeConnector head = e.getHeadNodeConnector();
658         return (switchManager.doesNodeConnectorExist(head));
659     }
660
661     private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
662         NodeConnector head = e.getHeadNodeConnector();
663         PendingUpdateTask task = new PendingUpdateTask(e, p, t);
664         synchronized (pendingUpdates) {
665             List<PendingUpdateTask> list = pendingUpdates.get(head);
666             if (list == null) {
667                 list = new LinkedList<PendingUpdateTask>();
668                 pendingUpdates.put(head, list);
669             }
670             list.add(task);
671             pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
672         }
673     }
674
675     private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
676         NodeConnector head = e.getHeadNodeConnector();
677         synchronized (pendingUpdates) {
678             List<PendingUpdateTask> list = pendingUpdates.get(head);
679             if (list != null) {
680                 log.warn("Enqueue edge update: edge {}, type {}", e, t);
681                 PendingUpdateTask task = new PendingUpdateTask(e, p, t);
682                 list.add(task);
683                 pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
684                 return true;
685             }
686         }
687
688         return false;
689     }
690
691     private boolean removePendingEvent(PendingUpdateTask t) {
692         t.cancel();
693         NodeConnector head = t.getHeadNodeConnector();
694         boolean removed = false;
695
696         synchronized (pendingUpdates) {
697             List<PendingUpdateTask> list = pendingUpdates.get(head);
698             if (list != null) {
699                 removed = list.remove(t);
700                 if (list.isEmpty()) {
701                     pendingUpdates.remove(head);
702                 }
703             }
704         }
705
706         return removed;
707     }
708
709     private void removePendingEvent(NodeConnector head, boolean doFlush) {
710         List<PendingUpdateTask> list;
711         synchronized (pendingUpdates) {
712             list = pendingUpdates.remove(head);
713         }
714
715         if (list != null) {
716             for (PendingUpdateTask task : list) {
717                 if (task.cancel() && doFlush) {
718                     task.flush();
719                 }
720             }
721             pendingTimer.purge();
722         }
723     }
724
725     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
726         return edgeUpdate(e, type, props, false);
727     }
728
729     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
730         if (!type.equals(UpdateType.ADDED) &&
731             enqueueEventIfPending(e, props, type)) {
732             return null;
733         }
734
735         switch (type) {
736         case ADDED:
737             if (this.edgesDB.containsKey(e)) {
738                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
739                 log.trace("Skipping redundant edge addition: {}", e);
740                 return null;
741             }
742
743             // Ensure that head node connector exists
744             if (!isPending) {
745                 if (headNodeConnectorExist(e)) {
746                     removePendingEvent(e.getHeadNodeConnector(), true);
747                 } else {
748                     log.warn("Ignore edge that contains invalid node connector: {}",
749                              e);
750                     addPendingEvent(e, props, type);
751                     return null;
752                 }
753             }
754
755             // Make sure the props are non-null or create a copy
756             if (props == null) {
757                 props = new HashSet<Property>();
758             } else {
759                 props = new HashSet<Property>(props);
760             }
761
762             // Check if nodeConnectors of the edge were correctly categorized
763             // by protocol plugin
764             crossCheckNodeConnectors(e);
765
766             // Now make sure there is the creation timestamp for the
767             // edge, if not there, stamp with the first update
768             boolean found_create = false;
769             for (Property prop : props) {
770                 if (prop instanceof TimeStamp) {
771                     TimeStamp t = (TimeStamp) prop;
772                     if (t.getTimeStampName().equals("creation")) {
773                         found_create = true;
774                         break;
775                     }
776                 }
777             }
778
779             if (!found_create) {
780                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
781                 props.add(t);
782             }
783
784             // Now add this in the database eventually overriding
785             // something that may have been already existing
786             this.edgesDB.put(e, props);
787
788             // Now populate the DB of NodeConnectors
789             // NOTE WELL: properties are empty sets, not really needed
790             // for now.
791             // The DB only contains ISL ports
792             if (isISLink(e)) {
793                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
794                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
795             }
796             log.trace("Edge {}  {}", e.toString(), type.name());
797             break;
798         case REMOVED:
799             // Now remove the edge from edgesDB
800             this.edgesDB.remove(e);
801
802             // Now lets update the NodeConnectors DB, the assumption
803             // here is that two NodeConnector are exclusively
804             // connected by 1 and only 1 edge, this is reasonable in
805             // the same plug (virtual of phisical) we can assume two
806             // cables won't be plugged. This could break only in case
807             // of devices in the middle that acts as hubs, but it
808             // should be safe to assume that won't happen.
809             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
810             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
811             log.trace("Edge {}  {}", e.toString(), type.name());
812             break;
813         case CHANGED:
814             Set<Property> oldProps = this.edgesDB.get(e);
815
816             // When property(s) changes lets make sure we can change it
817             // all except the creation time stamp because that should
818             // be set only when the edge is created
819             TimeStamp timeStamp = null;
820             if (oldProps != null) {
821                 for (Property prop : oldProps) {
822                     if (prop instanceof TimeStamp) {
823                         TimeStamp tsProp = (TimeStamp) prop;
824                         if (tsProp.getTimeStampName().equals("creation")) {
825                             timeStamp = tsProp;
826                             break;
827                         }
828                     }
829                 }
830             }
831
832             // Now lets make sure new properties are non-null
833             if (props == null) {
834                 props = new HashSet<Property>();
835             } else {
836                 // Copy the set so noone is going to change the content
837                 props = new HashSet<Property>(props);
838             }
839
840             // Now lets remove the creation property if exist in the
841             // new props
842             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
843                 Property prop = i.next();
844                 if (prop instanceof TimeStamp) {
845                     TimeStamp t = (TimeStamp) prop;
846                     if (t.getTimeStampName().equals("creation")) {
847                         if (timeStamp != null) {
848                             i.remove();
849                         }
850                         break;
851                     }
852                 }
853             }
854
855             // Now lets add the creation timestamp in it
856             if (timeStamp != null) {
857                 props.add(timeStamp);
858             }
859
860             // Finally update
861             this.edgesDB.put(e, props);
862             log.trace("Edge {}  {}", e.toString(), type.name());
863             break;
864         }
865         return new TopoEdgeUpdate(e, props, type);
866     }
867
868     private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
869         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
870         for (TopoEdgeUpdate teu : topoedgeupdateList) {
871             boolean isPending = (teu instanceof PendingEdgeUpdate);
872             Edge e = teu.getEdge();
873             Set<Property> p = teu.getProperty();
874             UpdateType type = teu.getUpdateType();
875             TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
876             if (update != null) {
877                 teuList.add(update);
878             }
879         }
880
881         if (!teuList.isEmpty()) {
882             // Now update the listeners
883             for (ITopologyManagerAware s : this.topologyManagerAware) {
884                 try {
885                     s.edgeUpdate(teuList);
886                 } catch (Exception exc) {
887                     log.error("Exception on edge update:", exc);
888                 }
889             }
890         }
891     }
892
893     @Override
894     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
895         updateQ.addAll(topoedgeupdateList);
896     }
897
898     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
899         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
900                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
901         return getLinkTuple(rLink);
902     }
903
904
905     private Edge getLinkTuple(TopologyUserLinkConfig link) {
906         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
907         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
908         try {
909             return new Edge(srcNodeConnector, dstNodeConnector);
910         } catch (Exception e) {
911             return null;
912         }
913     }
914
915     @Override
916     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
917         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
918     }
919
920     @Override
921     public Status addUserLink(TopologyUserLinkConfig userLink) {
922         if (!userLink.isValid()) {
923             return new Status(StatusCode.BADREQUEST,
924                     "User link configuration invalid.");
925         }
926         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
927
928         //Check if this link already configured
929         //NOTE: infinispan cache doesn't support Map.containsValue()
930         // (which is linear time in most ConcurrentMap impl anyway)
931         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
932             if (existingLink.equals(userLink)) {
933                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
934             }
935         }
936         //attempt put, if mapping for this key already existed return conflict
937         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
938             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
939                     + " already exists. Please use another name");
940         }
941
942         Edge linkTuple = getLinkTuple(userLink);
943         if (linkTuple != null) {
944             if (!isProductionLink(linkTuple)) {
945                 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
946                                                 new HashSet<Property>());
947                 if (teu == null) {
948                     userLinksDB.remove(userLink.getName());
949                     return new Status(StatusCode.NOTFOUND,
950                            "Link configuration contains invalid node connector: "
951                            + userLink);
952                 }
953             }
954
955             linkTuple = getReverseLinkTuple(userLink);
956             if (linkTuple != null) {
957                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
958                 if (!isProductionLink(linkTuple)) {
959                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
960                 }
961             }
962         }
963         return new Status(StatusCode.SUCCESS);
964     }
965
966     @Override
967     public Status deleteUserLink(String linkName) {
968         if (linkName == null) {
969             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
970         }
971
972         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
973         Edge linkTuple;
974         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
975             if (! isProductionLink(linkTuple)) {
976                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
977             }
978
979             linkTuple = getReverseLinkTuple(link);
980             if (! isProductionLink(linkTuple)) {
981                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
982             }
983         }
984         return new Status(StatusCode.SUCCESS);
985     }
986
987     private void registerWithOSGIConsole() {
988         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
989                 .getBundleContext();
990         bundleContext.registerService(CommandProvider.class.getName(), this,
991                 null);
992     }
993
994     @Override
995     public String getHelp() {
996         StringBuffer help = new StringBuffer();
997         help.append("---Topology Manager---\n");
998         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
999         help.append("\t deleteUserLink <name>\n");
1000         help.append("\t printUserLink\n");
1001         help.append("\t printNodeEdges\n");
1002         return help.toString();
1003     }
1004
1005     public void _printUserLink(CommandInterpreter ci) {
1006         for (String name : this.userLinksDB.keySet()) {
1007             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
1008             ci.println("Name : " + name);
1009             ci.println(linkConfig);
1010             ci.println("Edge " + getLinkTuple(linkConfig));
1011             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
1012         }
1013     }
1014
1015     public void _addUserLink(CommandInterpreter ci) {
1016         String name = ci.nextArgument();
1017         if ((name == null)) {
1018             ci.println("Please enter a valid Name");
1019             return;
1020         }
1021
1022         String ncStr1 = ci.nextArgument();
1023         if (ncStr1 == null) {
1024             ci.println("Please enter two node connector strings");
1025             return;
1026         }
1027         String ncStr2 = ci.nextArgument();
1028         if (ncStr2 == null) {
1029             ci.println("Please enter second node connector string");
1030             return;
1031         }
1032
1033         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
1034         if (nc1 == null) {
1035             ci.println("Invalid input node connector 1 string: " + ncStr1);
1036             return;
1037         }
1038         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
1039         if (nc2 == null) {
1040             ci.println("Invalid input node connector 2 string: " + ncStr2);
1041             return;
1042         }
1043
1044         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
1045         ci.println(this.addUserLink(config));
1046     }
1047
1048     public void _deleteUserLink(CommandInterpreter ci) {
1049         String name = ci.nextArgument();
1050         if ((name == null)) {
1051             ci.println("Please enter a valid Name");
1052             return;
1053         }
1054         this.deleteUserLink(name);
1055     }
1056
1057     public void _printNodeEdges(CommandInterpreter ci) {
1058         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
1059         if (nodeEdges == null) {
1060             return;
1061         }
1062         Set<Node> nodeSet = nodeEdges.keySet();
1063         if (nodeSet == null) {
1064             return;
1065         }
1066         ci.println("        Node                                         Edge");
1067         for (Node node : nodeSet) {
1068             Set<Edge> edgeSet = nodeEdges.get(node);
1069             if (edgeSet == null) {
1070                 continue;
1071             }
1072             for (Edge edge : edgeSet) {
1073                 ci.println(node + "             " + edge);
1074             }
1075         }
1076     }
1077
1078     @Override
1079     public Object readObject(ObjectInputStream ois)
1080             throws FileNotFoundException, IOException, ClassNotFoundException {
1081         return ois.readObject();
1082     }
1083
1084     @Override
1085     public Status saveConfiguration() {
1086         return saveConfig();
1087     }
1088
1089     @Override
1090     public void edgeOverUtilized(Edge edge) {
1091         log.warn("Link Utilization above normal: {}", edge);
1092     }
1093
1094     @Override
1095     public void edgeUtilBackToNormal(Edge edge) {
1096         log.warn("Link Utilization back to normal: {}", edge);
1097     }
1098
1099     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
1100         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
1101         upd.setLocal(isLocal);
1102         notifyQ.add(upd);
1103     }
1104
1105     @Override
1106     public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
1107         // NOP
1108     }
1109
1110     @Override
1111     public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
1112         // Remove pending edge updates for the given node connector.
1113         // Pending events should be notified if the node connector exists.
1114         boolean doFlush = !type.equals(UpdateType.REMOVED);
1115         removePendingEvent(nc, doFlush);
1116     }
1117
1118     @Override
1119     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
1120         if (cacheName.equals(TOPOEDGESDB)) {
1121             // This is the case of an Edge being added to the topology DB
1122             final Edge e = (Edge) key;
1123             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
1124             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
1125         }
1126     }
1127
1128     @Override
1129     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
1130         if (cacheName.equals(TOPOEDGESDB)) {
1131             final Edge e = (Edge) key;
1132             log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
1133             final Set<Property> props = (Set<Property>) new_value;
1134             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
1135         }
1136     }
1137
1138     @Override
1139     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
1140         if (cacheName.equals(TOPOEDGESDB)) {
1141             final Edge e = (Edge) key;
1142             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
1143             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
1144         }
1145     }
1146
1147     class TopologyNotify implements Runnable {
1148         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
1149         private TopoEdgeUpdate entry;
1150         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
1151         private boolean notifyListeners;
1152
1153         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
1154             this.notifyQ = notifyQ;
1155         }
1156
1157         @Override
1158         public void run() {
1159             while (true) {
1160                 try {
1161                     log.trace("New run of TopologyNotify");
1162                     notifyListeners = false;
1163                     // First we block waiting for an element to get in
1164                     entry = notifyQ.take();
1165                     // Then we drain the whole queue if elements are
1166                     // in it without getting into any blocking condition
1167                     for (; entry != null; entry = notifyQ.poll()) {
1168                         teuList.add(entry);
1169                         notifyListeners = true;
1170                     }
1171
1172                     // Notify listeners only if there were updates drained else
1173                     // give up
1174                     if (notifyListeners) {
1175                         log.trace("Notifier thread, notified a listener");
1176                         // Now update the listeners
1177                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
1178                             try {
1179                                 s.edgeUpdate(teuList);
1180                             } catch (Exception exc) {
1181                                 log.error("Exception on edge update:", exc);
1182                             }
1183                         }
1184                     }
1185                     teuList.clear();
1186
1187                     // Lets sleep for sometime to allow aggregation of event
1188                     Thread.sleep(100);
1189                 } catch (InterruptedException e1) {
1190                     if (shuttingDown) {
1191                         return;
1192                     }
1193                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
1194                 } catch (Exception e2) {
1195                     log.error("", e2);
1196                 }
1197             }
1198         }
1199     }
1200
1201     public List<String> printUserLink() {
1202         List<String> result = new ArrayList<String>();
1203         for (String name : this.userLinksDB.keySet()) {
1204             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
1205             result.add("Name : " + name);
1206             result.add(linkConfig.toString());
1207             result.add("Edge " + getLinkTuple(linkConfig));
1208             result.add("Reverse Edge " + getReverseLinkTuple(linkConfig));
1209         }
1210         return result;
1211     }
1212
1213     public List<String> addUserLink(String name, String ncStr1, String ncStr2) {
1214         List<String> result = new ArrayList<String>();
1215         if ((name == null)) {
1216             result.add("Please enter a valid Name");
1217             return result;
1218         }
1219
1220         if (ncStr1 == null) {
1221             result.add("Please enter two node connector strings");
1222             return result;
1223         }
1224         if (ncStr2 == null) {
1225             result.add("Please enter second node connector string");
1226             return result;
1227         }
1228
1229         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
1230         if (nc1 == null) {
1231             result.add("Invalid input node connector 1 string: " + ncStr1);
1232             return result;
1233         }
1234         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
1235         if (nc2 == null) {
1236             result.add("Invalid input node connector 2 string: " + ncStr2);
1237             return result;
1238         }
1239
1240         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
1241         result.add(this.addUserLink(config).toString());
1242         return result;
1243     }
1244
1245     public List<String> deleteUserLinkShell(String name) {
1246         List<String> result = new ArrayList<String>();
1247         if ((name == null)) {
1248             result.add("Please enter a valid Name");
1249             return result;
1250         }
1251         this.deleteUserLink(name);
1252         return result;
1253     }
1254
1255     public List<String> printNodeEdges() {
1256         List<String> result = new ArrayList<String>();
1257         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
1258         if (nodeEdges == null) {
1259             return result;
1260         }
1261         Set<Node> nodeSet = nodeEdges.keySet();
1262         if (nodeSet == null) {
1263             return result;
1264         }
1265         result.add("        Node                                         Edge");
1266         for (Node node : nodeSet) {
1267             Set<Edge> edgeSet = nodeEdges.get(node);
1268             if (edgeSet == null) {
1269                 continue;
1270             }
1271             for (Edge edge : edgeSet) {
1272                 result.add(node + "             " + edge);
1273             }
1274         }
1275         return result;
1276     }
1277
1278     // Only for unit test.
1279     void startTest() {
1280         pendingTimer = new Timer("Topology Pending Update Timer");
1281         updateThread = new Thread(new UpdateTopology(), "Topology Update");
1282         updateThread.start();
1283     }
1284
1285     void stopTest() {
1286         shuttingDown = true;
1287         updateThread.interrupt();
1288         pendingTimer.cancel();
1289     }
1290
1291     boolean flushUpdateQueue(long timeout) {
1292         long limit = System.currentTimeMillis() + timeout;
1293         long cur;
1294         do {
1295             if (updateQ.peek() == null) {
1296                 return true;
1297             }
1298
1299             try {
1300                 Thread.sleep(100);
1301             } catch (InterruptedException e) {
1302                 break;
1303             }
1304             cur = System.currentTimeMillis();
1305         } while (cur < limit);
1306
1307         return false;
1308     }
1309 }