Merge "Fix clustering versions"
[controller.git] / opendaylight / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
index 8f905a999a02f3b2c5f3d12395bc0d1a97ac5cc1..ff1c026a344d74efa5aa1f1b57e05e60775cd1a4 100644 (file)
@@ -36,7 +36,9 @@ import org.opendaylight.controller.clustering.services.CacheExistException;
 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.configuration.ConfigurationObject;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.configuration.IConfigurationContainerService;
 import org.opendaylight.controller.sal.core.Edge;
 import org.opendaylight.controller.sal.core.Host;
 import org.opendaylight.controller.sal.core.Node;
@@ -47,12 +49,11 @@ import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.ITopologyService;
 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
-import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.IObjectReader;
-import org.opendaylight.controller.sal.utils.ObjectReader;
-import org.opendaylight.controller.sal.utils.ObjectWriter;
+import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
 import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
@@ -68,20 +69,22 @@ import org.slf4j.LoggerFactory;
  * topology database and notifies all the listeners of topology changes.
  */
 public class TopologyManagerImpl implements
-        ICacheUpdateAware,
+        ICacheUpdateAware<Object, Object>,
         ITopologyManager,
         IConfigurationContainerAware,
         IListenTopoUpdates,
         IObjectReader,
         CommandProvider {
-    static final String TOPOEDGESDB = "topologymanager.edgesDB";
-    static final String TOPOHOSTSDB = "topologymanager.hostsDB";
-    static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
-    static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
+    protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
+    protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+    protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+    protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
+    private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
-    private static final String SAVE = "Save";
     private ITopologyService topoService;
     private IClusterContainerServices clusterContainerService;
+    private IConfigurationContainerService configurationService;
+    private ISwitchManager switchManager;
     // DB of all the Edges with properties which constitute our topology
     private ConcurrentMap<Edge, Set<Property>> edgesDB;
     // DB of all NodeConnector which are part of ISL Edges, meaning they
@@ -95,8 +98,6 @@ public class TopologyManagerImpl implements
     // Topology Manager Aware listeners - for clusterwide updates
     private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
             new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
-    private static String ROOT = GlobalConstants.STARTUPHOME.toString();
-    private String userLinksFileName;
     private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
     private volatile Boolean shuttingDown = false;
@@ -162,6 +163,28 @@ public class TopologyManagerImpl implements
         }
     }
 
+    public void setConfigurationContainerService(IConfigurationContainerService service) {
+        log.trace("Got configuration service set request {}", service);
+        this.configurationService = service;
+    }
+
+    public void unsetConfigurationContainerService(IConfigurationContainerService service) {
+        log.trace("Got configuration service UNset request");
+        this.configurationService = null;
+    }
+
+    void setSwitchManager(ISwitchManager s) {
+        log.debug("Adding ISwitchManager: {}", s);
+        this.switchManager = s;
+    }
+
+    void unsetSwitchManager(ISwitchManager s) {
+        if (this.switchManager == s) {
+            log.debug("Removing ISwitchManager: {}", s);
+            this.switchManager = null;
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
@@ -179,58 +202,43 @@ public class TopologyManagerImpl implements
             containerName = "UNKNOWN";
         }
 
-        userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
         registerWithOSGIConsole();
         loadConfiguration();
+
         // Restore the shuttingDown status on init of the component
         shuttingDown = false;
         notifyThread = new Thread(new TopologyNotify(notifyQ));
     }
 
-    @SuppressWarnings({ "unchecked", "deprecation" })
+    @SuppressWarnings({ "unchecked" })
     private void allocateCaches() {
-        try {
             this.edgesDB =
-                    (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
-                            EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
-        }
+                    (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-        try {
             this.hostsDB =
-                    (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
-                            TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
-        }
+                    (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
-        try {
             this.nodeConnectorsDB =
-                    (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+                    (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
                             TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
-        }
-
-        try {
             this.userLinksDB =
-                    (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+                    (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
                             TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-        } catch (CacheExistException cee) {
-            log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
-        } catch (CacheConfigException cce) {
-            log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
+    }
+
+    private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
+        ConcurrentMap<?, ?> cache = null;
+        try {
+            cache = this.clusterContainerService.createCache(cacheName, cacheModes);
+        } catch (CacheExistException e) {
+            log.debug(cacheName + " cache already exists - destroy and recreate if needed");
+        } catch (CacheConfigException e) {
+            log.error(cacheName + " cache configuration invalid - check cache mode");
         }
+        return cache;
     }
 
-    @SuppressWarnings({ "unchecked", "deprecation" })
+    @SuppressWarnings({ "unchecked" })
     private void retrieveCaches() {
         if (this.clusterContainerService == null) {
             log.error("Cluster Services is null, can't retrieve caches.");
@@ -291,16 +299,9 @@ public class TopologyManagerImpl implements
         notifyThread = null;
     }
 
-    @SuppressWarnings("unchecked")
     private void loadConfiguration() {
-        ObjectReader objReader = new ObjectReader();
-        ConcurrentMap<String, TopologyUserLinkConfig> confList =
-                (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
-
-        if (confList != null) {
-            for (TopologyUserLinkConfig conf : confList.values()) {
-                addUserLink(conf);
-            }
+        for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) {
+            addUserLink((TopologyUserLinkConfig) conf);
         }
     }
 
@@ -310,12 +311,10 @@ public class TopologyManagerImpl implements
     }
 
     public Status saveConfigInternal() {
-        ObjectWriter objWriter = new ObjectWriter();
+        Status saveStatus = configurationService.persistConfiguration(
+                new ArrayList<ConfigurationObject>(userLinksDB.values()), USER_LINKS_FILE_NAME);
 
-        Status saveStatus = objWriter.write(
-                new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
-
-        if (! saveStatus.isSuccess()) {
+        if (!saveStatus.isSuccess()) {
             return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
         }
         return saveStatus;
@@ -385,6 +384,56 @@ public class TopologyManagerImpl implements
                 || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
     }
 
+    /**
+     * This method cross checks the determination of nodeConnector type by Discovery Service
+     * against the information in SwitchManager and updates it accordingly.
+     * @param e
+     *          The edge
+     */
+    private void crossCheckNodeConnectors(Edge e) {
+        NodeConnector nc;
+        if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
+            nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
+            if (nc != null) {
+                e.setHeadNodeConnector(nc);
+            }
+        }
+        if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
+            nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
+            if (nc != null) {
+                e.setTailNodeConnector(nc);
+            }
+        }
+    }
+
+    /**
+     * A NodeConnector may have been categorized as of type Production by Discovery Service.
+     * But at the time when this determination was made, only OF nodes were known to Discovery
+     * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
+     * so, then it returns a new NodeConnector with correct type.
+     *
+     * @param nc
+     *       NodeConnector as passed on in the edge
+     * @return
+     *       If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
+     *       with correct type, null otherwise
+     */
+
+    private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
+
+        for (Node node : switchManager.getNodes()) {
+            String nodeName = node.getNodeIDString();
+            log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
+                    nc.getNode().getNodeIDString());
+            if (nodeName.equals(nc.getNode().getNodeIDString())) {
+                NodeConnector nodeConnector = NodeConnectorCreator
+                        .createNodeConnector(node.getType(), nc.getID(), node);
+                return nodeConnector;
+            }
+        }
+        return null;
+    }
+
     /**
      * The Map returned is a copy of the current topology hence if the topology
      * changes the copy doesn't
@@ -473,7 +522,7 @@ public class TopologyManagerImpl implements
     }
 
     @Override
-    public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
+    public synchronized void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
 
         // Clone the property set in case non null else just
         // create an empty one. Caches allocated via infinispan
@@ -508,23 +557,47 @@ public class TopologyManagerImpl implements
         }
     }
 
+    private boolean headNodeConnectorExist(Edge e) {
+        /*
+         * Only check the head end point which is supposed to be part of a
+         * network node we control (present in our inventory). If we checked the
+         * tail end point as well, we would not store the edges that connect to
+         * a non sdn enable port on a non sdn capable production switch. We want
+         * to be able to see these switches on the topology.
+         */
+        NodeConnector head = e.getHeadNodeConnector();
+        return (switchManager.doesNodeConnectorExist(head));
+    }
+
     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
         switch (type) {
         case ADDED:
-            // Make sure the props are non-null
+
+
+            if (this.edgesDB.containsKey(e)) {
+                // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
+                log.trace("Skipping redundant edge addition: {}", e);
+                return null;
+            }
+
+            // Make sure the props are non-null or create a copy
             if (props == null) {
                 props = new HashSet<Property>();
             } else {
                 props = new HashSet<Property>(props);
             }
 
-            //in case of node switch-over to a different cluster controller,
-            //let's retain edge props
-            Set<Property> currentProps = this.edgesDB.get(e);
-            if (currentProps != null){
-                props.addAll(currentProps);
+
+            // Ensure that head node connector exists
+            if (!headNodeConnectorExist(e)) {
+                log.warn("Ignore edge that contains invalid node connector: {}", e);
+                return null;
             }
 
+            // Check if nodeConnectors of the edge were correctly categorized
+            // by protocol plugin
+            crossCheckNodeConnectors(e);
+
             // Now make sure there is the creation timestamp for the
             // edge, if not there, stamp with the first update
             boolean found_create = false;
@@ -575,10 +648,9 @@ public class TopologyManagerImpl implements
         case CHANGED:
             Set<Property> oldProps = this.edgesDB.get(e);
 
-            // When property changes lets make sure we can change it
+            // When property(s) changes lets make sure we can change it
             // all except the creation time stamp because that should
-            // be changed only when the edge is destroyed and created
-            // again
+            // be set only when the edge is created
             TimeStamp timeStamp = null;
             for (Property prop : oldProps) {
                 if (prop instanceof TimeStamp) {
@@ -632,18 +704,21 @@ public class TopologyManagerImpl implements
             Set<Property> p = topoedgeupdateList.get(i).getProperty();
             UpdateType type = topoedgeupdateList.get(i).getUpdateType();
             TopoEdgeUpdate teu = edgeUpdate(e, type, p);
-            teuList.add(teu);
+            if (teu != null) {
+                teuList.add(teu);
+            }
         }
 
-        // Now update the listeners
-        for (ITopologyManagerAware s : this.topologyManagerAware) {
-            try {
-                s.edgeUpdate(teuList);
-            } catch (Exception exc) {
-                log.error("Exception on edge update:", exc);
+        if (!teuList.isEmpty()) {
+            // Now update the listeners
+            for (ITopologyManagerAware s : this.topologyManagerAware) {
+                try {
+                    s.edgeUpdate(teuList);
+                } catch (Exception exc) {
+                    log.error("Exception on edge update:", exc);
+                }
             }
         }
-
     }
 
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
@@ -693,7 +768,14 @@ public class TopologyManagerImpl implements
         Edge linkTuple = getLinkTuple(userLink);
         if (linkTuple != null) {
             if (!isProductionLink(linkTuple)) {
-                edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
+                TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED,
+                                                new HashSet<Property>());
+                if (teu == null) {
+                    userLinksDB.remove(userLink.getName());
+                    return new Status(StatusCode.NOTFOUND,
+                           "Link configuration contains invalid node connector: "
+                           + userLink);
+                }
             }
 
             linkTuple = getReverseLinkTuple(userLink);
@@ -860,7 +942,7 @@ public class TopologyManagerImpl implements
     public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
         if (cacheName.equals(TOPOEDGESDB)) {
             final Edge e = (Edge) key;
-            log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+            log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
             final Set<Property> props = (Set<Property>) new_value;
             edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
         }