X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Ftopologymanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Ftopologymanager%2Finternal%2FTopologyManagerImpl.java;h=ff1c026a344d74efa5aa1f1b57e05e60775cd1a4;hp=004d1b98db3eff61292937888a6fa34b1f9a4ee6;hb=d9de6c2ddfb30eb2eee782c229f6e03cef352bbd;hpb=52757c15dc010e68ef15899daf50f78291966bee diff --git a/opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index 004d1b98db..ff1c026a34 100644 --- a/opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -36,7 +36,9 @@ import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.configuration.ConfigurationObject; import org.opendaylight.controller.configuration.IConfigurationContainerAware; +import org.opendaylight.controller.configuration.IConfigurationContainerService; import org.opendaylight.controller.sal.core.Edge; import org.opendaylight.controller.sal.core.Host; import org.opendaylight.controller.sal.core.Node; @@ -47,12 +49,11 @@ import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; -import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.IObjectReader; -import org.opendaylight.controller.sal.utils.ObjectReader; -import org.opendaylight.controller.sal.utils.ObjectWriter; +import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; @@ -68,20 +69,22 @@ import org.slf4j.LoggerFactory; * topology database and notifies all the listeners of topology changes. */ public class TopologyManagerImpl implements - ICacheUpdateAware, + ICacheUpdateAware, ITopologyManager, IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, CommandProvider { - static final String TOPOEDGESDB = "topologymanager.edgesDB"; - static final String TOPOHOSTSDB = "topologymanager.hostsDB"; - static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB"; - static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; + protected static final String TOPOEDGESDB = "topologymanager.edgesDB"; + protected static final String TOPOHOSTSDB = "topologymanager.hostsDB"; + protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB"; + protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; + private static final String USER_LINKS_FILE_NAME = "userTopology.conf"; private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); - private static final String SAVE = "Save"; private ITopologyService topoService; private IClusterContainerServices clusterContainerService; + private IConfigurationContainerService configurationService; + private ISwitchManager switchManager; // DB of all the Edges with properties which constitute our topology private ConcurrentMap> edgesDB; // DB of all NodeConnector which are part of ISL Edges, meaning they @@ -95,8 +98,6 @@ public class TopologyManagerImpl implements // Topology Manager Aware listeners - for clusterwide updates private Set topologyManagerClusterWideAware = new CopyOnWriteArraySet(); - private static String ROOT = GlobalConstants.STARTUPHOME.toString(); - private String userLinksFileName; private ConcurrentMap userLinksDB; private BlockingQueue notifyQ = new LinkedBlockingQueue(); private volatile Boolean shuttingDown = false; @@ -162,6 +163,28 @@ public class TopologyManagerImpl implements } } + public void setConfigurationContainerService(IConfigurationContainerService service) { + log.trace("Got configuration service set request {}", service); + this.configurationService = service; + } + + public void unsetConfigurationContainerService(IConfigurationContainerService service) { + log.trace("Got configuration service UNset request"); + this.configurationService = null; + } + + void setSwitchManager(ISwitchManager s) { + log.debug("Adding ISwitchManager: {}", s); + this.switchManager = s; + } + + void unsetSwitchManager(ISwitchManager s) { + if (this.switchManager == s) { + log.debug("Removing ISwitchManager: {}", s); + this.switchManager = null; + } + } + /** * Function called by the dependency manager when all the required * dependencies are satisfied @@ -179,58 +202,43 @@ public class TopologyManagerImpl implements containerName = "UNKNOWN"; } - userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; registerWithOSGIConsole(); loadConfiguration(); + // Restore the shuttingDown status on init of the component shuttingDown = false; notifyThread = new Thread(new TopologyNotify(notifyQ)); } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void allocateCaches() { - try { this.edgesDB = - (ConcurrentMap>) this.clusterContainerService.createCache(TOPOEDGESDB, - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - } catch (CacheExistException cee) { - log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed"); - } catch (CacheConfigException cce) { - log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode"); - } + (ConcurrentMap>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - try { this.hostsDB = - (ConcurrentMap>>>) this.clusterContainerService.createCache( - TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - } catch (CacheExistException cee) { - log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed"); - } catch (CacheConfigException cce) { - log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode"); - } + (ConcurrentMap>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - try { this.nodeConnectorsDB = - (ConcurrentMap>) this.clusterContainerService.createCache( + (ConcurrentMap>) allocateCache( TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - } catch (CacheExistException cee) { - log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed"); - } catch (CacheConfigException cce) { - log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode"); - } - - try { this.userLinksDB = - (ConcurrentMap) this.clusterContainerService.createCache( + (ConcurrentMap) allocateCache( TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - } catch (CacheExistException cee) { - log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed"); - } catch (CacheConfigException cce) { - log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode"); + } + + private ConcurrentMap allocateCache(String cacheName, Set cacheModes) { + ConcurrentMap cache = null; + try { + cache = this.clusterContainerService.createCache(cacheName, cacheModes); + } catch (CacheExistException e) { + log.debug(cacheName + " cache already exists - destroy and recreate if needed"); + } catch (CacheConfigException e) { + log.error(cacheName + " cache configuration invalid - check cache mode"); } + return cache; } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void retrieveCaches() { if (this.clusterContainerService == null) { log.error("Cluster Services is null, can't retrieve caches."); @@ -291,16 +299,9 @@ public class TopologyManagerImpl implements notifyThread = null; } - @SuppressWarnings("unchecked") private void loadConfiguration() { - ObjectReader objReader = new ObjectReader(); - ConcurrentMap confList = - (ConcurrentMap) objReader.read(this, userLinksFileName); - - if (confList != null) { - for (TopologyUserLinkConfig conf : confList.values()) { - addUserLink(conf); - } + for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) { + addUserLink((TopologyUserLinkConfig) conf); } } @@ -310,12 +311,10 @@ public class TopologyManagerImpl implements } public Status saveConfigInternal() { - ObjectWriter objWriter = new ObjectWriter(); - - Status saveStatus = objWriter.write( - new ConcurrentHashMap(userLinksDB), userLinksFileName); + Status saveStatus = configurationService.persistConfiguration( + new ArrayList(userLinksDB.values()), USER_LINKS_FILE_NAME); - if (! saveStatus.isSuccess()) { + if (!saveStatus.isSuccess()) { return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription()); } return saveStatus; @@ -385,6 +384,56 @@ public class TopologyManagerImpl implements || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); } + /** + * This method cross checks the determination of nodeConnector type by Discovery Service + * against the information in SwitchManager and updates it accordingly. + * @param e + * The edge + */ + private void crossCheckNodeConnectors(Edge e) { + NodeConnector nc; + if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) { + nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector()); + if (nc != null) { + e.setHeadNodeConnector(nc); + } + } + if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) { + nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector()); + if (nc != null) { + e.setTailNodeConnector(nc); + } + } + } + + /** + * A NodeConnector may have been categorized as of type Production by Discovery Service. + * But at the time when this determination was made, only OF nodes were known to Discovery + * Service. This method checks if the node of nodeConnector is known to SwitchManager. If + * so, then it returns a new NodeConnector with correct type. + * + * @param nc + * NodeConnector as passed on in the edge + * @return + * If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector + * with correct type, null otherwise + */ + + private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) { + + for (Node node : switchManager.getNodes()) { + String nodeName = node.getNodeIDString(); + log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName, + nc.getNode().getNodeIDString()); + if (nodeName.equals(nc.getNode().getNodeIDString())) { + NodeConnector nodeConnector = NodeConnectorCreator + .createNodeConnector(node.getType(), nc.getID(), node); + return nodeConnector; + } + } + return null; + } + /** * The Map returned is a copy of the current topology hence if the topology * changes the copy doesn't @@ -473,7 +522,7 @@ public class TopologyManagerImpl implements } @Override - public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set props) { + public synchronized void updateHostLink(NodeConnector port, Host h, UpdateType t, Set props) { // Clone the property set in case non null else just // create an empty one. Caches allocated via infinispan @@ -508,23 +557,47 @@ public class TopologyManagerImpl implements } } + private boolean headNodeConnectorExist(Edge e) { + /* + * Only check the head end point which is supposed to be part of a + * network node we control (present in our inventory). If we checked the + * tail end point as well, we would not store the edges that connect to + * a non sdn enable port on a non sdn capable production switch. We want + * to be able to see these switches on the topology. + */ + NodeConnector head = e.getHeadNodeConnector(); + return (switchManager.doesNodeConnectorExist(head)); + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { switch (type) { case ADDED: - // Make sure the props are non-null + + + if (this.edgesDB.containsKey(e)) { + // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks + log.trace("Skipping redundant edge addition: {}", e); + return null; + } + + // Make sure the props are non-null or create a copy if (props == null) { props = new HashSet(); } else { props = new HashSet(props); } - //in case of node switch-over to a different cluster controller, - //let's retain edge props - Set currentProps = this.edgesDB.get(e); - if (currentProps != null){ - props.addAll(currentProps); + + // Ensure that head node connector exists + if (!headNodeConnectorExist(e)) { + log.warn("Ignore edge that contains invalid node connector: {}", e); + return null; } + // Check if nodeConnectors of the edge were correctly categorized + // by protocol plugin + crossCheckNodeConnectors(e); + // Now make sure there is the creation timestamp for the // edge, if not there, stamp with the first update boolean found_create = false; @@ -575,10 +648,9 @@ public class TopologyManagerImpl implements case CHANGED: Set oldProps = this.edgesDB.get(e); - // When property changes lets make sure we can change it + // When property(s) changes lets make sure we can change it // all except the creation time stamp because that should - // be changed only when the edge is destroyed and created - // again + // be set only when the edge is created TimeStamp timeStamp = null; for (Property prop : oldProps) { if (prop instanceof TimeStamp) { @@ -632,18 +704,21 @@ public class TopologyManagerImpl implements Set p = topoedgeupdateList.get(i).getProperty(); UpdateType type = topoedgeupdateList.get(i).getUpdateType(); TopoEdgeUpdate teu = edgeUpdate(e, type, p); - teuList.add(teu); + if (teu != null) { + teuList.add(teu); + } } - // Now update the listeners - for (ITopologyManagerAware s : this.topologyManagerAware) { - try { - s.edgeUpdate(teuList); - } catch (Exception exc) { - log.error("Exception on edge update:", exc); + if (!teuList.isEmpty()) { + // Now update the listeners + for (ITopologyManagerAware s : this.topologyManagerAware) { + try { + s.edgeUpdate(teuList); + } catch (Exception exc) { + log.error("Exception on edge update:", exc); + } } } - } private Edge getReverseLinkTuple(TopologyUserLinkConfig link) { @@ -693,7 +768,14 @@ public class TopologyManagerImpl implements Edge linkTuple = getLinkTuple(userLink); if (linkTuple != null) { if (!isProductionLink(linkTuple)) { - edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); + TopoEdgeUpdate teu = edgeUpdate(linkTuple, UpdateType.ADDED, + new HashSet()); + if (teu == null) { + userLinksDB.remove(userLink.getName()); + return new Status(StatusCode.NOTFOUND, + "Link configuration contains invalid node connector: " + + userLink); + } } linkTuple = getReverseLinkTuple(userLink); @@ -860,7 +942,7 @@ public class TopologyManagerImpl implements public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) { if (cacheName.equals(TOPOEDGESDB)) { final Edge e = (Edge) key; - log.trace("Edge {} CHANGED isLocal:{}", e, originLocal); + log.trace("Edge {} UPDATED isLocal:{}", e, originLocal); final Set props = (Set) new_value; edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal); } @@ -918,10 +1000,10 @@ public class TopologyManagerImpl implements // Lets sleep for sometime to allow aggregation of event Thread.sleep(100); } catch (InterruptedException e1) { - log.warn("TopologyNotify interrupted {}", e1.getMessage()); if (shuttingDown) { return; } + log.warn("TopologyNotify interrupted {}", e1.getMessage()); } catch (Exception e2) { log.error("", e2); }