a410f99ba9c8d619a3c2771be95f09f410b270a3
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.topologymanager.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.util.ArrayList;
15 import java.util.Dictionary;
16 import java.util.EnumSet;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.apache.felix.dm.Component;
32 import org.eclipse.osgi.framework.console.CommandInterpreter;
33 import org.eclipse.osgi.framework.console.CommandProvider;
34 import org.opendaylight.controller.clustering.services.CacheConfigException;
35 import org.opendaylight.controller.clustering.services.CacheExistException;
36 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
37 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
38 import org.opendaylight.controller.clustering.services.IClusterServices;
39 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
40 import org.opendaylight.controller.sal.core.Edge;
41 import org.opendaylight.controller.sal.core.Host;
42 import org.opendaylight.controller.sal.core.Node;
43 import org.opendaylight.controller.sal.core.NodeConnector;
44 import org.opendaylight.controller.sal.core.Property;
45 import org.opendaylight.controller.sal.core.TimeStamp;
46 import org.opendaylight.controller.sal.core.UpdateType;
47 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
48 import org.opendaylight.controller.sal.topology.ITopologyService;
49 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
50 import org.opendaylight.controller.sal.utils.GlobalConstants;
51 import org.opendaylight.controller.sal.utils.IObjectReader;
52 import org.opendaylight.controller.sal.utils.ObjectReader;
53 import org.opendaylight.controller.sal.utils.ObjectWriter;
54 import org.opendaylight.controller.sal.utils.Status;
55 import org.opendaylight.controller.sal.utils.StatusCode;
56 import org.opendaylight.controller.switchmanager.ISwitchManager;
57 import org.opendaylight.controller.topologymanager.ITopologyManager;
58 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
59 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
60 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
61 import org.osgi.framework.BundleContext;
62 import org.osgi.framework.FrameworkUtil;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 /**
67  * The class describes TopologyManager which is the central repository of the
68  * network topology. It provides service for applications to interact with
69  * topology database and notifies all the listeners of topology changes.
70  */
71 public class TopologyManagerImpl implements
72         ICacheUpdateAware<Object, Object>,
73         ITopologyManager,
74         IConfigurationContainerAware,
75         IListenTopoUpdates,
76         IObjectReader,
77         CommandProvider {
78     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
79     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
80     protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
81     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
82     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
83     private ITopologyService topoService;
84     private IClusterContainerServices clusterContainerService;
85     private ISwitchManager switchManager;
86     // DB of all the Edges with properties which constitute our topology
87     private ConcurrentMap<Edge, Set<Property>> edgesDB;
88     // DB of all NodeConnector which are part of ISL Edges, meaning they
89     // are connected to another NodeConnector on the other side of an ISL link.
90     // NodeConnector of a Production Edge is not part of this DB.
91     private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
92     // DB of all the NodeConnectors with an Host attached to it
93     private ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>> hostsDB;
94     // Topology Manager Aware listeners
95     private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
96     // Topology Manager Aware listeners - for clusterwide updates
97     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
98             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
99     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
100     private String userLinksFileName;
101     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
102     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
103     private volatile Boolean shuttingDown = false;
104     private Thread notifyThread;
105
106
107     void nonClusterObjectCreate() {
108         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
109         hostsDB = new ConcurrentHashMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>();
110         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
111         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
112     }
113
114     void setTopologyManagerAware(ITopologyManagerAware s) {
115         if (this.topologyManagerAware != null) {
116             log.debug("Adding ITopologyManagerAware: {}", s);
117             this.topologyManagerAware.add(s);
118         }
119     }
120
121     void unsetTopologyManagerAware(ITopologyManagerAware s) {
122         if (this.topologyManagerAware != null) {
123             log.debug("Removing ITopologyManagerAware: {}", s);
124             this.topologyManagerAware.remove(s);
125         }
126     }
127
128     void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
129         if (this.topologyManagerClusterWideAware != null) {
130             log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
131             this.topologyManagerClusterWideAware.add(s);
132         }
133     }
134
135     void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
136         if (this.topologyManagerClusterWideAware != null) {
137             log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
138             this.topologyManagerClusterWideAware.remove(s);
139         }
140     }
141
142     void setTopoService(ITopologyService s) {
143         log.debug("Adding ITopologyService: {}", s);
144         this.topoService = s;
145     }
146
147     void unsetTopoService(ITopologyService s) {
148         if (this.topoService == s) {
149             log.debug("Removing ITopologyService: {}", s);
150             this.topoService = null;
151         }
152     }
153
154     void setClusterContainerService(IClusterContainerServices s) {
155         log.debug("Cluster Service set");
156         this.clusterContainerService = s;
157     }
158
159     void unsetClusterContainerService(IClusterContainerServices s) {
160         if (this.clusterContainerService == s) {
161             log.debug("Cluster Service removed!");
162             this.clusterContainerService = null;
163         }
164     }
165
166     void setSwitchManager(ISwitchManager s) {
167         log.debug("Adding ISwitchManager: {}", s);
168         this.switchManager = s;
169     }
170
171     void unsetSwitchManager(ISwitchManager s) {
172         if (this.switchManager == s) {
173             log.debug("Removing ISwitchManager: {}", s);
174             this.switchManager = null;
175         }
176     }
177
178     /**
179      * Function called by the dependency manager when all the required
180      * dependencies are satisfied
181      *
182      */
183     void init(Component c) {
184         allocateCaches();
185         retrieveCaches();
186         String containerName = null;
187         Dictionary<?, ?> props = c.getServiceProperties();
188         if (props != null) {
189             containerName = (String) props.get("containerName");
190         } else {
191             // In the Global instance case the containerName is empty
192             containerName = "UNKNOWN";
193         }
194
195         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
196         registerWithOSGIConsole();
197         loadConfiguration();
198         // Restore the shuttingDown status on init of the component
199         shuttingDown = false;
200         notifyThread = new Thread(new TopologyNotify(notifyQ));
201     }
202
203     @SuppressWarnings({ "unchecked" })
204     private void allocateCaches() {
205             this.edgesDB =
206                     (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
207
208             this.hostsDB =
209                     (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
210
211             this.nodeConnectorsDB =
212                     (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
213                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
214             this.userLinksDB =
215                     (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
216                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
217     }
218
219     private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
220         ConcurrentMap<?, ?> cache = null;
221         try {
222             cache = this.clusterContainerService.createCache(cacheName, cacheModes);
223         } catch (CacheExistException e) {
224             log.debug(cacheName + " cache already exists - destroy and recreate if needed");
225         } catch (CacheConfigException e) {
226             log.error(cacheName + " cache configuration invalid - check cache mode");
227         }
228         return cache;
229     }
230
231     @SuppressWarnings({ "unchecked" })
232     private void retrieveCaches() {
233         if (this.clusterContainerService == null) {
234             log.error("Cluster Services is null, can't retrieve caches.");
235             return;
236         }
237
238         this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
239         if (edgesDB == null) {
240             log.error("Failed to get cache for " + TOPOEDGESDB);
241         }
242
243         this.hostsDB =
244                 (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
245         if (hostsDB == null) {
246             log.error("Failed to get cache for " + TOPOHOSTSDB);
247         }
248
249         this.nodeConnectorsDB =
250                 (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
251         if (nodeConnectorsDB == null) {
252             log.error("Failed to get cache for " + TOPONODECONNECTORDB);
253         }
254
255         this.userLinksDB =
256                 (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
257         if (userLinksDB == null) {
258             log.error("Failed to get cache for " + TOPOUSERLINKSDB);
259         }
260     }
261
262     /**
263      * Function called after the topology manager has registered the service in
264      * OSGi service registry.
265      *
266      */
267     void started() {
268         // Start the batcher thread for the cluster wide topology updates
269         notifyThread.start();
270         // SollicitRefresh MUST be called here else if called at init
271         // time it may sollicit refresh too soon.
272         log.debug("Sollicit topology refresh");
273         topoService.sollicitRefresh();
274     }
275
276     void stop() {
277         shuttingDown = true;
278         notifyThread.interrupt();
279     }
280
281     /**
282      * Function called by the dependency manager when at least one dependency
283      * become unsatisfied or when the component is shutting down because for
284      * example bundle is being stopped.
285      *
286      */
287     void destroy() {
288         notifyQ.clear();
289         notifyThread = null;
290     }
291
292     @SuppressWarnings("unchecked")
293     private void loadConfiguration() {
294         ObjectReader objReader = new ObjectReader();
295         ConcurrentMap<String, TopologyUserLinkConfig> confList =
296                 (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
297
298         if (confList != null) {
299             for (TopologyUserLinkConfig conf : confList.values()) {
300                 addUserLink(conf);
301             }
302         }
303     }
304
305     @Override
306     public Status saveConfig() {
307         return saveConfigInternal();
308     }
309
310     public Status saveConfigInternal() {
311         ObjectWriter objWriter = new ObjectWriter();
312
313         Status saveStatus = objWriter.write(
314                 new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
315
316         if (! saveStatus.isSuccess()) {
317             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
318         }
319         return saveStatus;
320     }
321
322     @Override
323     public Map<Node, Set<Edge>> getNodeEdges() {
324         if (this.edgesDB == null) {
325             return null;
326         }
327
328         Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
329         for (Edge edge : this.edgesDB.keySet()) {
330             // Lets analyze the tail
331             Node node = edge.getTailNodeConnector().getNode();
332             Set<Edge> nodeEdges = res.get(node);
333             if (nodeEdges == null) {
334                 nodeEdges = new HashSet<Edge>();
335                 res.put(node, nodeEdges);
336             }
337             nodeEdges.add(edge);
338
339             // Lets analyze the head
340             node = edge.getHeadNodeConnector().getNode();
341             nodeEdges = res.get(node);
342             if (nodeEdges == null) {
343                 nodeEdges = new HashSet<Edge>();
344                 res.put(node, nodeEdges);
345             }
346             nodeEdges.add(edge);
347         }
348
349         return res;
350     }
351
352     @Override
353     public boolean isInternal(NodeConnector p) {
354         if (this.nodeConnectorsDB == null) {
355             return false;
356         }
357
358         // This is an internal NodeConnector if is connected to
359         // another Node i.e it's part of the nodeConnectorsDB
360         return (this.nodeConnectorsDB.get(p) != null);
361     }
362
363     /**
364      * This method returns true if the edge is an ISL link.
365      *
366      * @param e
367      *            The edge
368      * @return true if it is an ISL link
369      */
370     public boolean isISLink(Edge e) {
371         return (!isProductionLink(e));
372     }
373
374     /**
375      * This method returns true if the edge is a production link.
376      *
377      * @param e
378      *            The edge
379      * @return true if it is a production link
380      */
381     public boolean isProductionLink(Edge e) {
382         return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
383                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
384     }
385
386     /**
387      * The Map returned is a copy of the current topology hence if the topology
388      * changes the copy doesn't
389      *
390      * @return A Map representing the current topology expressed as edges of the
391      *         network
392      */
393     @Override
394     public Map<Edge, Set<Property>> getEdges() {
395         if (this.edgesDB == null) {
396             return null;
397         }
398
399         Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
400         Set<Property> props;
401         for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
402             // Sets of props are copied because the composition of
403             // those properties could change with time
404             props = new HashSet<Property>(edgeEntry.getValue());
405             // We can simply reuse the key because the object is
406             // immutable so doesn't really matter that we are
407             // referencing the only owned by a different table, the
408             // meaning is the same because doesn't change with time.
409             edgeMap.put(edgeEntry.getKey(), props);
410         }
411
412         return edgeMap;
413     }
414
415     @Override
416     public Set<NodeConnector> getNodeConnectorWithHost() {
417         if (this.hostsDB == null) {
418             return null;
419         }
420
421         return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
422     }
423
424     @Override
425     public Map<Node, Set<NodeConnector>> getNodesWithNodeConnectorHost() {
426         if (this.hostsDB == null) {
427             return null;
428         }
429         HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
430         Node node;
431         Set<NodeConnector> portSet;
432         for (NodeConnector nc : this.hostsDB.keySet()) {
433             node = nc.getNode();
434             portSet = res.get(node);
435             if (portSet == null) {
436                 // Create the HashSet if null
437                 portSet = new HashSet<NodeConnector>();
438                 res.put(node, portSet);
439             }
440
441             // Keep updating the HashSet, given this is not a
442             // clustered map we can just update the set without
443             // worrying to update the hashmap.
444             portSet.add(nc);
445         }
446
447         return (res);
448     }
449
450     @Override
451     public Host getHostAttachedToNodeConnector(NodeConnector port) {
452         List<Host> hosts = getHostsAttachedToNodeConnector(port);
453         if(hosts != null && !hosts.isEmpty()){
454             return hosts.get(0);
455         }
456         return null;
457     }
458
459     @Override
460     public List<Host> getHostsAttachedToNodeConnector(NodeConnector p) {
461         Set<ImmutablePair<Host, Set<Property>>> hosts;
462         if (this.hostsDB == null || (hosts = this.hostsDB.get(p)) == null) {
463             return null;
464         }
465         // create a list of hosts
466         List<Host> retHosts = new LinkedList<Host>();
467         for(ImmutablePair<Host, Set<Property>> host : hosts) {
468             retHosts.add(host.getLeft());
469         }
470         return retHosts;
471     }
472
473     @Override
474     public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
475
476         // Clone the property set in case non null else just
477         // create an empty one. Caches allocated via infinispan
478         // don't allow null values
479         if (props == null) {
480             props = new HashSet<Property>();
481         } else {
482             props = new HashSet<Property>(props);
483         }
484         ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
485
486         // get the host list
487         Set<ImmutablePair<Host, Set<Property>>> hostSet = this.hostsDB.get(port);
488         if(hostSet == null) {
489             hostSet = new HashSet<ImmutablePair<Host, Set<Property>>>();
490         }
491         switch (t) {
492         case ADDED:
493         case CHANGED:
494             hostSet.add(thisHost);
495             this.hostsDB.put(port, hostSet);
496             break;
497         case REMOVED:
498             hostSet.remove(thisHost);
499             if(hostSet.isEmpty()) {
500                 //remove only if hasn't been concurrently modified
501                 this.hostsDB.remove(port, hostSet);
502             } else {
503                 this.hostsDB.put(port, hostSet);
504             }
505             break;
506         }
507     }
508
509     private boolean headNodeConnectorExist(Edge e) {
510         /*
511          * Only check the head end point which is supposed to be part of a
512          * network node we control (present in our inventory). If we checked the
513          * tail end point as well, we would not store the edges that connect to
514          * a non sdn enable port on a non sdn capable production switch. We want
515          * to be able to see these switches on the topology.
516          */
517         NodeConnector head = e.getHeadNodeConnector();
518         return (switchManager.doesNodeConnectorExist(head));
519     }
520
521     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
522         switch (type) {
523         case ADDED:
524             // Avoid redundant update as notifications trigger expensive tasks
525             if (edgesDB.containsKey(e)) {
526                 log.trace("Skipping redundant edge addition: {}", e);
527                 return null;
528             }
529
530             // Ensure that head node connector exists
531             if (!headNodeConnectorExist(e)) {
532                 log.warn("Ignore edge that contains invalid node connector: {}", e);
533                 return null;
534             }
535
536             // Make sure the props are non-null
537             if (props == null) {
538                 props = new HashSet<Property>();
539             } else {
540                 props = new HashSet<Property>(props);
541             }
542
543             //in case of node switch-over to a different cluster controller,
544             //let's retain edge props
545             Set<Property> currentProps = this.edgesDB.get(e);
546             if (currentProps != null){
547                 props.addAll(currentProps);
548             }
549
550             // Now make sure there is the creation timestamp for the
551             // edge, if not there, stamp with the first update
552             boolean found_create = false;
553             for (Property prop : props) {
554                 if (prop instanceof TimeStamp) {
555                     TimeStamp t = (TimeStamp) prop;
556                     if (t.getTimeStampName().equals("creation")) {
557                         found_create = true;
558                         break;
559                     }
560                 }
561             }
562
563             if (!found_create) {
564                 TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
565                 props.add(t);
566             }
567
568             // Now add this in the database eventually overriding
569             // something that may have been already existing
570             this.edgesDB.put(e, props);
571
572             // Now populate the DB of NodeConnectors
573             // NOTE WELL: properties are empty sets, not really needed
574             // for now.
575             // The DB only contains ISL ports
576             if (isISLink(e)) {
577                 this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
578                 this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
579             }
580             log.trace("Edge {}  {}", e.toString(), type.name());
581             break;
582         case REMOVED:
583             // Now remove the edge from edgesDB
584             this.edgesDB.remove(e);
585
586             // Now lets update the NodeConnectors DB, the assumption
587             // here is that two NodeConnector are exclusively
588             // connected by 1 and only 1 edge, this is reasonable in
589             // the same plug (virtual of phisical) we can assume two
590             // cables won't be plugged. This could break only in case
591             // of devices in the middle that acts as hubs, but it
592             // should be safe to assume that won't happen.
593             this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
594             this.nodeConnectorsDB.remove(e.getTailNodeConnector());
595             log.trace("Edge {}  {}", e.toString(), type.name());
596             break;
597         case CHANGED:
598             Set<Property> oldProps = this.edgesDB.get(e);
599
600             // When property changes lets make sure we can change it
601             // all except the creation time stamp because that should
602             // be changed only when the edge is destroyed and created
603             // again
604             TimeStamp timeStamp = null;
605             for (Property prop : oldProps) {
606                 if (prop instanceof TimeStamp) {
607                     TimeStamp tsProp = (TimeStamp) prop;
608                     if (tsProp.getTimeStampName().equals("creation")) {
609                         timeStamp = tsProp;
610                         break;
611                     }
612                 }
613             }
614
615             // Now lets make sure new properties are non-null
616             if (props == null) {
617                 props = new HashSet<Property>();
618             } else {
619                 // Copy the set so noone is going to change the content
620                 props = new HashSet<Property>(props);
621             }
622
623             // Now lets remove the creation property if exist in the
624             // new props
625             for (Iterator<Property> i = props.iterator(); i.hasNext();) {
626                 Property prop = i.next();
627                 if (prop instanceof TimeStamp) {
628                     TimeStamp t = (TimeStamp) prop;
629                     if (t.getTimeStampName().equals("creation")) {
630                         i.remove();
631                         break;
632                     }
633                 }
634             }
635
636             // Now lets add the creation timestamp in it
637             if (timeStamp != null) {
638                 props.add(timeStamp);
639             }
640
641             // Finally update
642             this.edgesDB.put(e, props);
643             log.trace("Edge {}  {}", e.toString(), type.name());
644             break;
645         }
646         return new TopoEdgeUpdate(e, props, type);
647     }
648
649     @Override
650     public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
651         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
652         for (int i = 0; i < topoedgeupdateList.size(); i++) {
653             Edge e = topoedgeupdateList.get(i).getEdge();
654             Set<Property> p = topoedgeupdateList.get(i).getProperty();
655             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
656             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
657             if (teu != null) {
658                 teuList.add(teu);
659             }
660         }
661
662         if (!teuList.isEmpty()) {
663             // Now update the listeners
664             for (ITopologyManagerAware s : this.topologyManagerAware) {
665                 try {
666                     s.edgeUpdate(teuList);
667                 } catch (Exception exc) {
668                     log.error("Exception on edge update:", exc);
669                 }
670             }
671         }
672     }
673
674     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
675         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
676                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
677         return getLinkTuple(rLink);
678     }
679
680
681     private Edge getLinkTuple(TopologyUserLinkConfig link) {
682         NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
683         NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
684         try {
685             return new Edge(srcNodeConnector, dstNodeConnector);
686         } catch (Exception e) {
687             return null;
688         }
689     }
690
691     @Override
692     public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
693         return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
694     }
695
696     @Override
697     public Status addUserLink(TopologyUserLinkConfig userLink) {
698         if (!userLink.isValid()) {
699             return new Status(StatusCode.BADREQUEST,
700                     "User link configuration invalid.");
701         }
702         userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
703
704         //Check if this link already configured
705         //NOTE: infinispan cache doesn't support Map.containsValue()
706         // (which is linear time in most ConcurrentMap impl anyway)
707         for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
708             if (existingLink.equals(userLink)) {
709                 return new Status(StatusCode.CONFLICT, "Link configuration exists");
710             }
711         }
712         //attempt put, if mapping for this key already existed return conflict
713         if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
714             return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
715                     + " already exists. Please use another name");
716         }
717
718         Edge linkTuple = getLinkTuple(userLink);
719         if (linkTuple != null) {
720             if (!isProductionLink(linkTuple)) {
721                 TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
722                                                 new HashSet<Property>());
723                 if (teu == null) {
724                     userLinksDB.remove(userLink.getName());
725                     return new Status(StatusCode.NOTFOUND,
726                            "Link configuration contains invalid node connector: "
727                            + userLink);
728                 }
729             }
730
731             linkTuple = getReverseLinkTuple(userLink);
732             if (linkTuple != null) {
733                 userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
734                 if (!isProductionLink(linkTuple)) {
735                     edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
736                 }
737             }
738         }
739         return new Status(StatusCode.SUCCESS);
740     }
741
742     @Override
743     public Status deleteUserLink(String linkName) {
744         if (linkName == null) {
745             return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
746         }
747
748         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
749         Edge linkTuple;
750         if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
751             if (! isProductionLink(linkTuple)) {
752                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
753             }
754
755             linkTuple = getReverseLinkTuple(link);
756             if (! isProductionLink(linkTuple)) {
757                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
758             }
759         }
760         return new Status(StatusCode.SUCCESS);
761     }
762
763     private void registerWithOSGIConsole() {
764         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
765                 .getBundleContext();
766         bundleContext.registerService(CommandProvider.class.getName(), this,
767                 null);
768     }
769
770     @Override
771     public String getHelp() {
772         StringBuffer help = new StringBuffer();
773         help.append("---Topology Manager---\n");
774         help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
775         help.append("\t deleteUserLink <name>\n");
776         help.append("\t printUserLink\n");
777         help.append("\t printNodeEdges\n");
778         return help.toString();
779     }
780
781     public void _printUserLink(CommandInterpreter ci) {
782         for (String name : this.userLinksDB.keySet()) {
783             TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
784             ci.println("Name : " + name);
785             ci.println(linkConfig);
786             ci.println("Edge " + getLinkTuple(linkConfig));
787             ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
788         }
789     }
790
791     public void _addUserLink(CommandInterpreter ci) {
792         String name = ci.nextArgument();
793         if ((name == null)) {
794             ci.println("Please enter a valid Name");
795             return;
796         }
797
798         String ncStr1 = ci.nextArgument();
799         if (ncStr1 == null) {
800             ci.println("Please enter two node connector strings");
801             return;
802         }
803         String ncStr2 = ci.nextArgument();
804         if (ncStr2 == null) {
805             ci.println("Please enter second node connector string");
806             return;
807         }
808
809         NodeConnector nc1 = NodeConnector.fromString(ncStr1);
810         if (nc1 == null) {
811             ci.println("Invalid input node connector 1 string: " + ncStr1);
812             return;
813         }
814         NodeConnector nc2 = NodeConnector.fromString(ncStr2);
815         if (nc2 == null) {
816             ci.println("Invalid input node connector 2 string: " + ncStr2);
817             return;
818         }
819
820         TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
821         ci.println(this.addUserLink(config));
822     }
823
824     public void _deleteUserLink(CommandInterpreter ci) {
825         String name = ci.nextArgument();
826         if ((name == null)) {
827             ci.println("Please enter a valid Name");
828             return;
829         }
830         this.deleteUserLink(name);
831     }
832
833     public void _printNodeEdges(CommandInterpreter ci) {
834         Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
835         if (nodeEdges == null) {
836             return;
837         }
838         Set<Node> nodeSet = nodeEdges.keySet();
839         if (nodeSet == null) {
840             return;
841         }
842         ci.println("        Node                                         Edge");
843         for (Node node : nodeSet) {
844             Set<Edge> edgeSet = nodeEdges.get(node);
845             if (edgeSet == null) {
846                 continue;
847             }
848             for (Edge edge : edgeSet) {
849                 ci.println(node + "             " + edge);
850             }
851         }
852     }
853
854     @Override
855     public Object readObject(ObjectInputStream ois)
856             throws FileNotFoundException, IOException, ClassNotFoundException {
857         return ois.readObject();
858     }
859
860     @Override
861     public Status saveConfiguration() {
862         return saveConfig();
863     }
864
865     @Override
866     public void edgeOverUtilized(Edge edge) {
867         log.warn("Link Utilization above normal: {}", edge);
868     }
869
870     @Override
871     public void edgeUtilBackToNormal(Edge edge) {
872         log.warn("Link Utilization back to normal: {}", edge);
873     }
874
875     private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
876         TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
877         upd.setLocal(isLocal);
878         notifyQ.add(upd);
879     }
880
881     @Override
882     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
883         if (cacheName.equals(TOPOEDGESDB)) {
884             // This is the case of an Edge being added to the topology DB
885             final Edge e = (Edge) key;
886             log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
887             edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
888         }
889     }
890
891     @Override
892     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
893         if (cacheName.equals(TOPOEDGESDB)) {
894             final Edge e = (Edge) key;
895             log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
896             final Set<Property> props = (Set<Property>) new_value;
897             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
898         }
899     }
900
901     @Override
902     public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
903         if (cacheName.equals(TOPOEDGESDB)) {
904             final Edge e = (Edge) key;
905             log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
906             edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
907         }
908     }
909
910     class TopologyNotify implements Runnable {
911         private final BlockingQueue<TopoEdgeUpdate> notifyQ;
912         private TopoEdgeUpdate entry;
913         private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
914         private boolean notifyListeners;
915
916         TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
917             this.notifyQ = notifyQ;
918         }
919
920         @Override
921         public void run() {
922             while (true) {
923                 try {
924                     log.trace("New run of TopologyNotify");
925                     notifyListeners = false;
926                     // First we block waiting for an element to get in
927                     entry = notifyQ.take();
928                     // Then we drain the whole queue if elements are
929                     // in it without getting into any blocking condition
930                     for (; entry != null; entry = notifyQ.poll()) {
931                         teuList.add(entry);
932                         notifyListeners = true;
933                     }
934
935                     // Notify listeners only if there were updates drained else
936                     // give up
937                     if (notifyListeners) {
938                         log.trace("Notifier thread, notified a listener");
939                         // Now update the listeners
940                         for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
941                             try {
942                                 s.edgeUpdate(teuList);
943                             } catch (Exception exc) {
944                                 log.error("Exception on edge update:", exc);
945                             }
946                         }
947                     }
948                     teuList.clear();
949
950                     // Lets sleep for sometime to allow aggregation of event
951                     Thread.sleep(100);
952                 } catch (InterruptedException e1) {
953                     if (shuttingDown) {
954                         return;
955                     }
956                     log.warn("TopologyNotify interrupted {}", e1.getMessage());
957                 } catch (Exception e2) {
958                     log.error("", e2);
959                 }
960             }
961         }
962     }
963 }