/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.topologymanager.internal; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.Date; import java.util.Dictionary; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.felix.dm.Component; import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; import org.opendaylight.controller.sal.core.Edge; import org.opendaylight.controller.sal.core.Host; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.core.Property; import org.opendaylight.controller.sal.core.TimeStamp; import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.ObjectReader; import org.opendaylight.controller.sal.utils.ObjectWriter; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The class describes TopologyManager which is the central repository of the * network topology. It provides service for applications to interact with * topology database and notifies all the listeners of topology changes. */ public class TopologyManagerImpl implements ITopologyManager, IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, CommandProvider { private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); private static final String SAVE = "Save"; private ITopologyService topoService; private IClusterContainerServices clusterContainerService; // DB of all the Edges with properties which constitute our topology private ConcurrentMap> edgesDB; // DB of all NodeConnector which are part of ISL Edges, meaning they // are connected to another NodeConnector on the other side of an ISL link. // NodeConnector of a Production Edge is not part of this DB. private ConcurrentMap> nodeConnectorsDB; // DB of all the NodeConnectors with an Host attached to it private ConcurrentMap>> hostsDB; // Topology Manager Aware listeners private Set topologyManagerAware = new CopyOnWriteArraySet();; private static String ROOT = GlobalConstants.STARTUPHOME.toString(); private String userLinksFileName; private ConcurrentMap userLinksDB; private ConcurrentMap configSaveEvent; void nonClusterObjectCreate() { edgesDB = new ConcurrentHashMap>(); hostsDB = new ConcurrentHashMap>>(); nodeConnectorsDB = new ConcurrentHashMap>(); userLinksDB = new ConcurrentHashMap(); configSaveEvent = new ConcurrentHashMap(); } void setTopologyManagerAware(ITopologyManagerAware s) { if (this.topologyManagerAware != null) { log.debug("Adding ITopologyManagerAware: {}", s); this.topologyManagerAware.add(s); } } void unsetTopologyManagerAware(ITopologyManagerAware s) { if (this.topologyManagerAware != null) { log.debug("Removing ITopologyManagerAware: {}", s); this.topologyManagerAware.remove(s); } } void setTopoService(ITopologyService s) { log.debug("Adding ITopologyService: {}", s); this.topoService = s; } void unsetTopoService(ITopologyService s) { if (this.topoService == s) { log.debug("Removing ITopologyService: {}", s); this.topoService = null; } } void setClusterContainerService(IClusterContainerServices s) { log.debug("Cluster Service set"); this.clusterContainerService = s; } void unsetClusterContainerService(IClusterContainerServices s) { if (this.clusterContainerService == s) { log.debug("Cluster Service removed!"); this.clusterContainerService = null; } } /** * Function called by the dependency manager when all the required * dependencies are satisfied * */ void init(Component c) { allocateCaches(); retrieveCaches(); String containerName = null; Dictionary props = c.getServiceProperties(); if (props != null) { containerName = (String) props.get("containerName"); } else { // In the Global instance case the containerName is empty containerName = "UNKNOWN"; } userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; registerWithOSGIConsole(); loadConfiguration(); } @SuppressWarnings({ "unchecked", "deprecation" }) private void allocateCaches(){ if (this.clusterContainerService == null) { nonClusterObjectCreate(); log.error("Cluster Services unavailable, allocated non-cluster caches!"); return; } try { this.edgesDB = (ConcurrentMap>) this.clusterContainerService.createCache( "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode"); } try { this.hostsDB = (ConcurrentMap>>) this.clusterContainerService .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode"); } try { this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode"); } try { this.userLinksDB = (ConcurrentMap) this.clusterContainerService .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode"); } try { this.configSaveEvent = (ConcurrentMap) this.clusterContainerService .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode"); } } @SuppressWarnings({ "unchecked", "deprecation" }) private void retrieveCaches() { if (this.clusterContainerService == null) { log.error("Cluster Services is null, can't retrieve caches."); return; } this.edgesDB = (ConcurrentMap>) this.clusterContainerService .getCache("topologymanager.edgesDB"); if (edgesDB == null) { log.error("Failed to get cache for topologymanager.edgesDB"); } this.hostsDB = (ConcurrentMap>>) this.clusterContainerService .getCache("topologymanager.hostsDB"); if (hostsDB == null) { log.error("Failed to get cache for topologymanager.hostsDB"); } this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService .getCache("topologymanager.nodeConnectorDB"); if (nodeConnectorsDB == null) { log.error("Failed to get cache for topologymanager.nodeConnectorDB"); } this.userLinksDB = (ConcurrentMap) this.clusterContainerService .getCache("topologymanager.userLinksDB"); if (userLinksDB == null) { log.error("Failed to get cache for topologymanager.userLinksDB"); } this.configSaveEvent = (ConcurrentMap) this.clusterContainerService .getCache("topologymanager.configSaveEvent"); if (configSaveEvent == null) { log.error("Failed to get cache for topologymanager.configSaveEvent"); } } /** * Function called after the topology manager has registered the service in * OSGi service registry. * */ void started() { // SollicitRefresh MUST be called here else if called at init // time it may sollicit refresh too soon. log.debug("Sollicit topology refresh"); topoService.sollicitRefresh(); } /** * Function called by the dependency manager when at least one dependency * become unsatisfied or when the component is shutting down because for * example bundle is being stopped. * */ void destroy() { } @SuppressWarnings("unchecked") private void loadConfiguration() { ObjectReader objReader = new ObjectReader(); ConcurrentMap confList = (ConcurrentMap) objReader.read(this, userLinksFileName); if (confList != null) { for (TopologyUserLinkConfig conf : confList.values()) { addUserLink(conf); } } } @Override public Status saveConfig() { // Publish the save config event to the cluster configSaveEvent.put(new Date().getTime(), SAVE ); return saveConfigInternal(); } public Status saveConfigInternal() { ObjectWriter objWriter = new ObjectWriter(); Status saveStatus = objWriter.write( new ConcurrentHashMap(userLinksDB), userLinksFileName); if (! saveStatus.isSuccess()) { return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription()); } return saveStatus; } @Override public Map> getNodeEdges() { if (this.edgesDB == null) { return null; } Map> res = new HashMap>(); for (Edge edge : this.edgesDB.keySet()) { // Lets analyze the tail Node node = edge.getTailNodeConnector().getNode(); Set nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); res.put(node, nodeEdges); } nodeEdges.add(edge); // Lets analyze the head node = edge.getHeadNodeConnector().getNode(); nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); res.put(node, nodeEdges); } nodeEdges.add(edge); } return res; } @Override public boolean isInternal(NodeConnector p) { if (this.nodeConnectorsDB == null) { return false; } // This is an internal NodeConnector if is connected to // another Node i.e it's part of the nodeConnectorsDB return (this.nodeConnectorsDB.get(p) != null); } /** * This method returns true if the edge is an ISL link. * * @param e * The edge * @return true if it is an ISL link */ public boolean isISLink(Edge e) { return (!isProductionLink(e)); } /** * This method returns true if the edge is a production link. * * @param e * The edge * @return true if it is a production link */ public boolean isProductionLink(Edge e) { return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION) || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); } /** * The Map returned is a copy of the current topology hence if the topology * changes the copy doesn't * * @return A Map representing the current topology expressed as edges of the * network */ @Override public Map> getEdges() { if (this.edgesDB == null) { return null; } Map> edgeMap = new HashMap>(); Set props; for (Map.Entry> edgeEntry : edgesDB.entrySet()) { // Sets of props are copied because the composition of // those properties could change with time props = new HashSet(edgeEntry.getValue()); // We can simply reuse the key because the object is // immutable so doesn't really matter that we are // referencing the only owned by a different table, the // meaning is the same because doesn't change with time. edgeMap.put(edgeEntry.getKey(), props); } return edgeMap; } @Override public Set getNodeConnectorWithHost() { if (this.hostsDB == null) { return null; } return (new HashSet(this.hostsDB.keySet())); } @Override public Map> getNodesWithNodeConnectorHost() { if (this.hostsDB == null) { return null; } HashMap> res = new HashMap>(); Node node; Set portSet; for (NodeConnector nc : this.hostsDB.keySet()) { node = nc.getNode(); portSet = res.get(node); if (portSet == null) { // Create the HashSet if null portSet = new HashSet(); res.put(node, portSet); } // Keep updating the HashSet, given this is not a // clustered map we can just update the set without // worrying to update the hashmap. portSet.add(nc); } return (res); } @Override public Host getHostAttachedToNodeConnector(NodeConnector port) { ImmutablePair> host; if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) { return null; } return host.getLeft(); } @Override public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set props) { // Clone the property set in case non null else just // create an empty one. Caches allocated via infinispan // don't allow null values if (props == null) { props = new HashSet(); } else { props = new HashSet(props); } ImmutablePair> thisHost = new ImmutablePair>(h, props); switch (t) { case ADDED: case CHANGED: this.hostsDB.put(port, thisHost); break; case REMOVED: //remove only if hasn't been concurrently modified this.hostsDB.remove(port, thisHost); break; } } private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { switch (type) { case ADDED: // Make sure the props are non-null if (props == null) { props = new HashSet(); } else { props = new HashSet(props); } //in case of node switch-over to a different cluster controller, //let's retain edge props Set currentProps = this.edgesDB.get(e); if (currentProps != null){ props.addAll(currentProps); } // Now make sure there is the creation timestamp for the // edge, if not there, stamp with the first update boolean found_create = false; for (Property prop : props) { if (prop instanceof TimeStamp) { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { found_create = true; break; } } } if (!found_create) { TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation"); props.add(t); } // Now add this in the database eventually overriding // something that may have been already existing this.edgesDB.put(e, props); // Now populate the DB of NodeConnectors // NOTE WELL: properties are empty sets, not really needed // for now. // The DB only contains ISL ports if (isISLink(e)) { this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet(1)); this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet(1)); } log.trace("Edge {} {}", e.toString(), type.name()); break; case REMOVED: // Now remove the edge from edgesDB this.edgesDB.remove(e); // Now lets update the NodeConnectors DB, the assumption // here is that two NodeConnector are exclusively // connected by 1 and only 1 edge, this is reasonable in // the same plug (virtual of phisical) we can assume two // cables won't be plugged. This could break only in case // of devices in the middle that acts as hubs, but it // should be safe to assume that won't happen. this.nodeConnectorsDB.remove(e.getHeadNodeConnector()); this.nodeConnectorsDB.remove(e.getTailNodeConnector()); log.trace("Edge {} {}", e.toString(), type.name()); break; case CHANGED: Set oldProps = this.edgesDB.get(e); // When property changes lets make sure we can change it // all except the creation time stamp because that should // be changed only when the edge is destroyed and created // again TimeStamp timeStamp = null; for (Property prop : oldProps) { if (prop instanceof TimeStamp) { TimeStamp tsProp = (TimeStamp) prop; if (tsProp.getTimeStampName().equals("creation")) { timeStamp = tsProp; break; } } } // Now lets make sure new properties are non-null if (props == null) { props = new HashSet(); } else { // Copy the set so noone is going to change the content props = new HashSet(props); } // Now lets remove the creation property if exist in the // new props for (Iterator i = props.iterator(); i.hasNext();) { Property prop = i.next(); if (prop instanceof TimeStamp) { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { i.remove(); break; } } } // Now lets add the creation timestamp in it if (timeStamp != null) { props.add(timeStamp); } // Finally update this.edgesDB.put(e, props); log.trace("Edge {} {}", e.toString(), type.name()); break; } return new TopoEdgeUpdate(e, props, type); } @Override public void edgeUpdate(List topoedgeupdateList) { List teuList = new ArrayList(); for (int i = 0; i < topoedgeupdateList.size(); i++) { Edge e = topoedgeupdateList.get(i).getEdge(); Set p = topoedgeupdateList.get(i).getProperty(); UpdateType type = topoedgeupdateList.get(i).getUpdateType(); TopoEdgeUpdate teu = edgeUpdate(e, type, p); teuList.add(teu); } // Now update the listeners for (ITopologyManagerAware s : this.topologyManagerAware) { try { s.edgeUpdate(teuList); } catch (Exception exc) { log.error("Exception on edge update:", exc); } } } private Edge getReverseLinkTuple(TopologyUserLinkConfig link) { TopologyUserLinkConfig rLink = new TopologyUserLinkConfig( link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector()); return getLinkTuple(rLink); } private Edge getLinkTuple(TopologyUserLinkConfig link) { NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector()); NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector()); try { return new Edge(srcNodeConnector, dstNodeConnector); } catch (Exception e) { return null; } } @Override public ConcurrentMap getUserLinks() { return new ConcurrentHashMap(userLinksDB); } @Override public Status addUserLink(TopologyUserLinkConfig userLink) { if (!userLink.isValid()) { return new Status(StatusCode.BADREQUEST, "User link configuration invalid."); } userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); //Check if this link already configured //NOTE: infinispan cache doesn't support Map.containsValue() // (which is linear time in most ConcurrentMap impl anyway) for (TopologyUserLinkConfig existingLink : userLinksDB.values()) { if (existingLink.equals(userLink)) { return new Status(StatusCode.CONFLICT, "Link configuration exists"); } } //attempt put, if mapping for this key already existed return conflict if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) { return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName() + " already exists. Please use another name"); } Edge linkTuple = getLinkTuple(userLink); if (linkTuple != null) { if (!isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); } linkTuple = getReverseLinkTuple(userLink); if (linkTuple != null) { userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); if (!isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); } } } return new Status(StatusCode.SUCCESS); } @Override public Status deleteUserLink(String linkName) { if (linkName == null) { return new Status(StatusCode.BADREQUEST, "User link name cannot be null."); } TopologyUserLinkConfig link = userLinksDB.remove(linkName); Edge linkTuple; if (link != null && (linkTuple = getLinkTuple(link)) != null) { if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } linkTuple = getReverseLinkTuple(link); if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } } return new Status(StatusCode.SUCCESS); } private void registerWithOSGIConsole() { BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()) .getBundleContext(); bundleContext.registerService(CommandProvider.class.getName(), this, null); } @Override public String getHelp() { StringBuffer help = new StringBuffer(); help.append("---Topology Manager---\n"); help.append("\t addUserLink \n"); help.append("\t deleteUserLink \n"); help.append("\t printUserLink\n"); help.append("\t printNodeEdges\n"); return help.toString(); } public void _printUserLink(CommandInterpreter ci) { for (String name : this.userLinksDB.keySet()) { TopologyUserLinkConfig linkConfig = userLinksDB.get(name); ci.println("Name : " + name); ci.println(linkConfig); ci.println("Edge " + getLinkTuple(linkConfig)); ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig)); } } public void _addUserLink(CommandInterpreter ci) { String name = ci.nextArgument(); if ((name == null)) { ci.println("Please enter a valid Name"); return; } String ncStr1 = ci.nextArgument(); if (ncStr1 == null) { ci.println("Please enter two node connector strings"); return; } String ncStr2 = ci.nextArgument(); if (ncStr2 == null) { ci.println("Please enter second node connector string"); return; } NodeConnector nc1 = NodeConnector.fromString(ncStr1); if (nc1 == null) { ci.println("Invalid input node connector 1 string: " + ncStr1); return; } NodeConnector nc2 = NodeConnector.fromString(ncStr2); if (nc2 == null) { ci.println("Invalid input node connector 2 string: " + ncStr2); return; } TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2); ci.println(this.addUserLink(config)); } public void _deleteUserLink(CommandInterpreter ci) { String name = ci.nextArgument(); if ((name == null)) { ci.println("Please enter a valid Name"); return; } this.deleteUserLink(name); } public void _printNodeEdges(CommandInterpreter ci) { Map> nodeEdges = getNodeEdges(); if (nodeEdges == null) { return; } Set nodeSet = nodeEdges.keySet(); if (nodeSet == null) { return; } ci.println(" Node Edge"); for (Node node : nodeSet) { Set edgeSet = nodeEdges.get(node); if (edgeSet == null) { continue; } for (Edge edge : edgeSet) { ci.println(node + " " + edge); } } } @Override public Object readObject(ObjectInputStream ois) throws FileNotFoundException, IOException, ClassNotFoundException { return ois.readObject(); } @Override public Status saveConfiguration() { return saveConfig(); } @Override public void edgeOverUtilized(Edge edge) { log.warn("Link Utilization above normal: {}", edge); } @Override public void edgeUtilBackToNormal(Edge edge) { log.warn("Link Utilization back to normal: {}", edge); } }