Implement cluster wide topology notifications and let routing use it
[controller.git] / opendaylight / topologymanager / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
index a043cbe92595ff011483296bc1fc27be6bb7a9ee..4972d3b5b5d73d41df3cfbbb99543db8fde2eb56 100644 (file)
@@ -21,9 +21,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.felix.dm.Component;
@@ -31,6 +33,7 @@ import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
@@ -52,6 +55,7 @@ import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
 import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -63,9 +67,17 @@ import org.slf4j.LoggerFactory;
  * network topology. It provides service for applications to interact with
  * topology database and notifies all the listeners of topology changes.
  */
-public class TopologyManagerImpl implements ITopologyManager,
-IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
-CommandProvider {
+public class TopologyManagerImpl implements
+        ICacheUpdateAware,
+        ITopologyManager,
+        IConfigurationContainerAware,
+        IListenTopoUpdates,
+        IObjectReader,
+        CommandProvider {
+    static final String TOPOEDGESDB = "topologymanager.edgesDB";
+    static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+    static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+    static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
     private static final String SAVE = "Save";
     private ITopologyService topoService;
@@ -79,13 +91,16 @@ CommandProvider {
     // DB of all the NodeConnectors with an Host attached to it
     private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
     // Topology Manager Aware listeners
-    private Set<ITopologyManagerAware> topologyManagerAware =
-            new CopyOnWriteArraySet<ITopologyManagerAware>();;
-
+    private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+    // Topology Manager Aware listeners - for clusterwide updates
+    private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+            new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
     private String userLinksFileName;
     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
-    private ConcurrentMap<Long, String> configSaveEvent;
+    private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private volatile Boolean shuttingDown = false;
+    private Thread notifyThread;
 
 
     void nonClusterObjectCreate() {
@@ -93,7 +108,6 @@ CommandProvider {
         hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
         nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
         userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
-        configSaveEvent = new ConcurrentHashMap<Long, String>();
     }
 
     void setTopologyManagerAware(ITopologyManagerAware s) {
@@ -110,6 +124,20 @@ CommandProvider {
         }
     }
 
+    void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.add(s);
+        }
+    }
+
+    void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+        if (this.topologyManagerClusterWideAware != null) {
+            log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+            this.topologyManagerClusterWideAware.remove(s);
+        }
+    }
+
     void setTopoService(ITopologyService s) {
         log.debug("Adding ITopologyService: {}", s);
         this.topoService = s;
@@ -140,10 +168,8 @@ CommandProvider {
      *
      */
     void init(Component c) {
-
         allocateCaches();
         retrieveCaches();
-
         String containerName = null;
         Dictionary<?, ?> props = c.getServiceProperties();
         if (props != null) {
@@ -156,61 +182,52 @@ CommandProvider {
         userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
         registerWithOSGIConsole();
         loadConfiguration();
+        // Restore the shuttingDown status on init of the component
+        shuttingDown = false;
+        notifyThread = new Thread(new TopologyNotify(notifyQ));
     }
 
     @SuppressWarnings({ "unchecked", "deprecation" })
-    private void allocateCaches(){
-        if (this.clusterContainerService == null) {
-            nonClusterObjectCreate();
-            log.error("Cluster Services unavailable, allocated non-cluster caches!");
-            return;
-        }
-
+    private void allocateCaches() {
         try {
-            this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(
-                    "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.edgesDB =
+                    (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+                            EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode");
+            log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                    .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.hostsDB =
+                    (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
+                            TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode");
+            log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                    .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.nodeConnectorsDB =
+                    (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+                            TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode");
+            log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
         }
 
         try {
-            this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
-                    .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            this.userLinksDB =
+                    (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+                            TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
         } catch (CacheExistException cee) {
-            log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed");
+            log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {
-            log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode");
+            log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
         }
-
-        try {
-            this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
-                    .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode");
-        }
-
     }
 
     @SuppressWarnings({ "unchecked", "deprecation" })
@@ -220,36 +237,28 @@ CommandProvider {
             return;
         }
 
-        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
-                .getCache("topologymanager.edgesDB");
+        this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
         if (edgesDB == null) {
-            log.error("Failed to get cache for topologymanager.edgesDB");
+            log.error("Failed to get cache for " + TOPOEDGESDB);
         }
 
-        this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
-                .getCache("topologymanager.hostsDB");
+        this.hostsDB =
+                (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
         if (hostsDB == null) {
-            log.error("Failed to get cache for topologymanager.hostsDB");
+            log.error("Failed to get cache for " + TOPOHOSTSDB);
         }
 
-        this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
-                .getCache("topologymanager.nodeConnectorDB");
+        this.nodeConnectorsDB =
+                (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
         if (nodeConnectorsDB == null) {
-            log.error("Failed to get cache for topologymanager.nodeConnectorDB");
+            log.error("Failed to get cache for " + TOPONODECONNECTORDB);
         }
 
-        this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
-                .getCache("topologymanager.userLinksDB");
+        this.userLinksDB =
+                (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
         if (userLinksDB == null) {
-            log.error("Failed to get cache for topologymanager.userLinksDB");
+            log.error("Failed to get cache for " + TOPOUSERLINKSDB);
         }
-
-        this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
-                .getCache("topologymanager.configSaveEvent");
-        if (configSaveEvent == null) {
-            log.error("Failed to get cache for topologymanager.configSaveEvent");
-        }
-
     }
 
     /**
@@ -258,12 +267,19 @@ CommandProvider {
      *
      */
     void started() {
+        // Start the batcher thread for the cluster wide topology updates
+        notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
         // time it may sollicit refresh too soon.
         log.debug("Sollicit topology refresh");
         topoService.sollicitRefresh();
     }
 
+    void stop() {
+        shuttingDown = true;
+        notifyThread.interrupt();
+    }
+
     /**
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
@@ -271,6 +287,8 @@ CommandProvider {
      *
      */
     void destroy() {
+        notifyQ.clear();
+        notifyThread = null;
     }
 
     @SuppressWarnings("unchecked")
@@ -288,8 +306,6 @@ CommandProvider {
 
     @Override
     public Status saveConfig() {
-        // Publish the save config event to the cluster
-        configSaveEvent.put(new Date().getTime(), SAVE );
         return saveConfigInternal();
     }
 
@@ -436,7 +452,7 @@ CommandProvider {
     @Override
     public Host getHostAttachedToNodeConnector(NodeConnector port) {
         ImmutablePair<Host, Set<Property>> host;
-        if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) {
+        if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
             return null;
         }
         return host.getLeft();
@@ -674,7 +690,7 @@ CommandProvider {
 
         TopologyUserLinkConfig link = userLinksDB.remove(linkName);
         Edge linkTuple;
-        if (link != null && (linkTuple = getLinkTuple(link)) != null) {
+        if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
             if (! isProductionLink(linkTuple)) {
                 edgeUpdate(linkTuple, UpdateType.REMOVED, null);
             }
@@ -799,4 +815,92 @@ CommandProvider {
         log.warn("Link Utilization back to normal: {}", edge);
     }
 
+    private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+        TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+        upd.setLocal(isLocal);
+        notifyQ.add(upd);
+    }
+
+    @Override
+    public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            // This is the case of an Edge being added to the topology DB
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+        }
+    }
+
+    @Override
+    public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+            final Set<Property> props = (Set<Property>) new_value;
+            edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+        }
+    }
+
+    @Override
+    public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+        if (cacheName.equals(TOPOEDGESDB)) {
+            final Edge e = (Edge) key;
+            log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+            edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+        }
+    }
+
+    class TopologyNotify implements Runnable {
+        private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+        private TopoEdgeUpdate entry;
+        private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+        private boolean notifyListeners;
+
+        TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+            this.notifyQ = notifyQ;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    log.trace("New run of TopologyNotify");
+                    notifyListeners = false;
+                    // First we block waiting for an element to get in
+                    entry = notifyQ.take();
+                    // Then we drain the whole queue if elements are
+                    // in it without getting into any blocking condition
+                    for (; entry != null; entry = notifyQ.poll()) {
+                        teuList.add(entry);
+                        notifyListeners = true;
+                    }
+
+                    // Notify listeners only if there were updates drained else
+                    // give up
+                    if (notifyListeners) {
+                        log.trace("Notifier thread, notified a listener");
+                        // Now update the listeners
+                        for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+                            try {
+                                s.edgeUpdate(teuList);
+                            } catch (Exception exc) {
+                                log.error("Exception on edge update:", exc);
+                            }
+                        }
+                    }
+                    teuList.clear();
+
+                    // Lets sleep for sometime to allow aggregation of event
+                    Thread.sleep(100);
+                } catch (InterruptedException e1) {
+                    log.warn("TopologyNotify interrupted {}", e1.getMessage());
+                    if (shuttingDown) {
+                        return;
+                    }
+                } catch (Exception e2) {
+                    log.error("", e2);
+                }
+            }
+        }
+    }
 }