X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Ftopologymanager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Ftopologymanager%2Finternal%2FTopologyManagerImpl.java;h=4972d3b5b5d73d41df3cfbbb99543db8fde2eb56;hp=9d24cc6fa89994a7014b15458010631e6ebb74f5;hb=8984fee53ee5f4a99eed43b01ad286ba0e781dd6;hpb=0907e1ae7a962369b6f8cf2ac9c49e9ae2545eea diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index 9d24cc6fa8..4972d3b5b5 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -12,16 +11,21 @@ package org.opendaylight.controller.topologymanager.internal; import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Date; import java.util.Dictionary; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.felix.dm.Component; @@ -29,10 +33,10 @@ import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; -import org.opendaylight.controller.sal.core.ConstructionException; import org.opendaylight.controller.sal.core.Edge; import org.opendaylight.controller.sal.core.Host; import org.opendaylight.controller.sal.core.Node; @@ -42,15 +46,16 @@ import org.opendaylight.controller.sal.core.TimeStamp; import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; -import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; import org.opendaylight.controller.sal.utils.GlobalConstants; -import org.opendaylight.controller.sal.utils.HexEncode; import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.ObjectReader; import org.opendaylight.controller.sal.utils.ObjectWriter; import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; +import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; @@ -62,55 +67,85 @@ import org.slf4j.LoggerFactory; * network topology. It provides service for applications to interact with * topology database and notifies all the listeners of topology changes. */ -public class TopologyManagerImpl implements ITopologyManager, - IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, +public class TopologyManagerImpl implements + ICacheUpdateAware, + ITopologyManager, + IConfigurationContainerAware, + IListenTopoUpdates, + IObjectReader, CommandProvider { - private static final Logger log = LoggerFactory - .getLogger(TopologyManagerImpl.class); - private ITopologyService topoService = null; - private IClusterContainerServices clusterContainerService = null; + static final String TOPOEDGESDB = "topologymanager.edgesDB"; + static final String TOPOHOSTSDB = "topologymanager.hostsDB"; + static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB"; + static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; + private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); + private static final String SAVE = "Save"; + private ITopologyService topoService; + private IClusterContainerServices clusterContainerService; // DB of all the Edges with properties which constitute our topology - private ConcurrentMap> edgesDB = null; - // DB of all NodeConnector which are part of Edges, meaning they - // are connected to another NodeConnector on the other side - private ConcurrentMap> nodeConnectorsDB = null; + private ConcurrentMap> edgesDB; + // DB of all NodeConnector which are part of ISL Edges, meaning they + // are connected to another NodeConnector on the other side of an ISL link. + // NodeConnector of a Production Edge is not part of this DB. + private ConcurrentMap> nodeConnectorsDB; // DB of all the NodeConnectors with an Host attached to it - private ConcurrentMap>> hostsDB = null; + private ConcurrentMap>> hostsDB; // Topology Manager Aware listeners - private Set topologyManagerAware = Collections - .synchronizedSet(new HashSet()); - + private Set topologyManagerAware = new CopyOnWriteArraySet(); + // Topology Manager Aware listeners - for clusterwide updates + private Set topologyManagerClusterWideAware = + new CopyOnWriteArraySet(); private static String ROOT = GlobalConstants.STARTUPHOME.toString(); - private String userLinksFileName = null; - private ConcurrentMap userLinks; - + private String userLinksFileName; + private ConcurrentMap userLinksDB; + private BlockingQueue notifyQ = new LinkedBlockingQueue(); + private volatile Boolean shuttingDown = false; + private Thread notifyThread; + + void nonClusterObjectCreate() { - edgesDB = new ConcurrentHashMap>(); - hostsDB = new ConcurrentHashMap>>(); - userLinks = new ConcurrentHashMap(); - nodeConnectorsDB = new ConcurrentHashMap>(); + edgesDB = new ConcurrentHashMap>(); + hostsDB = new ConcurrentHashMap>>(); + nodeConnectorsDB = new ConcurrentHashMap>(); + userLinksDB = new ConcurrentHashMap(); } - void setTopologyManagerAware(ITopologyManagerAware s) { if (this.topologyManagerAware != null) { - log.debug("Adding ITopologyManagerAware: " + s); + log.debug("Adding ITopologyManagerAware: {}", s); this.topologyManagerAware.add(s); } } void unsetTopologyManagerAware(ITopologyManagerAware s) { if (this.topologyManagerAware != null) { + log.debug("Removing ITopologyManagerAware: {}", s); this.topologyManagerAware.remove(s); } } + void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) { + if (this.topologyManagerClusterWideAware != null) { + log.debug("Adding ITopologyManagerClusterWideAware: {}", s); + this.topologyManagerClusterWideAware.add(s); + } + } + + void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) { + if (this.topologyManagerClusterWideAware != null) { + log.debug("Removing ITopologyManagerClusterWideAware: {}", s); + this.topologyManagerClusterWideAware.remove(s); + } + } + void setTopoService(ITopologyService s) { + log.debug("Adding ITopologyService: {}", s); this.topoService = s; } void unsetTopoService(ITopologyService s) { if (this.topoService == s) { + log.debug("Removing ITopologyService: {}", s); this.topoService = null; } } @@ -133,8 +168,10 @@ public class TopologyManagerImpl implements ITopologyManager, * */ void init(Component c) { + allocateCaches(); + retrieveCaches(); String containerName = null; - Dictionary props = c.getServiceProperties(); + Dictionary props = c.getServiceProperties(); if (props != null) { containerName = (String) props.get("containerName"); } else { @@ -142,134 +179,146 @@ public class TopologyManagerImpl implements ITopologyManager, containerName = "UNKNOWN"; } - if (this.clusterContainerService == null) { - log.error("Cluster Services is null, not expected!"); - return; - } + userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; + registerWithOSGIConsole(); + loadConfiguration(); + // Restore the shuttingDown status on init of the component + shuttingDown = false; + notifyThread = new Thread(new TopologyNotify(notifyQ)); + } - if (this.topoService == null) { - log.error("Topology Services is null, not expected!"); - return; + @SuppressWarnings({ "unchecked", "deprecation" }) + private void allocateCaches() { + try { + this.edgesDB = + (ConcurrentMap>) this.clusterContainerService.createCache(TOPOEDGESDB, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + } catch (CacheExistException cee) { + log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed"); + } catch (CacheConfigException cce) { + log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode"); } try { - this.edgesDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.edgesDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.hostsDB = + (ConcurrentMap>>) this.clusterContainerService.createCache( + TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.edgesDB Cache already exists - " - + "destroy and recreate if needed"); + log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.edgesDB Cache configuration invalid - " - + "check cache mode"); + log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode"); } try { - this.hostsDB = (ConcurrentMap>>) this.clusterContainerService - .createCache("topologymanager.hostsDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.nodeConnectorsDB = + (ConcurrentMap>) this.clusterContainerService.createCache( + TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.hostsDB Cache already exists - " - + "destroy and recreate if needed"); + log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.hostsDB Cache configuration invalid - " - + "check cache mode"); + log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode"); } try { - this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.nodeConnectorDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.userLinksDB = + (ConcurrentMap) this.clusterContainerService.createCache( + TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.nodeConnectorDB Cache already exists" - + " - destroy and recreate if needed"); + log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.nodeConnectorDB Cache configuration " - + "invalid - check cache mode"); + log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode"); } + } - userLinks = new ConcurrentHashMap(); + @SuppressWarnings({ "unchecked", "deprecation" }) + private void retrieveCaches() { + if (this.clusterContainerService == null) { + log.error("Cluster Services is null, can't retrieve caches."); + return; + } - userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; - registerWithOSGIConsole(); - loadConfiguration(); + this.edgesDB = (ConcurrentMap>) this.clusterContainerService.getCache(TOPOEDGESDB); + if (edgesDB == null) { + log.error("Failed to get cache for " + TOPOEDGESDB); + } + + this.hostsDB = + (ConcurrentMap>>) this.clusterContainerService.getCache(TOPOHOSTSDB); + if (hostsDB == null) { + log.error("Failed to get cache for " + TOPOHOSTSDB); + } + + this.nodeConnectorsDB = + (ConcurrentMap>) this.clusterContainerService.getCache(TOPONODECONNECTORDB); + if (nodeConnectorsDB == null) { + log.error("Failed to get cache for " + TOPONODECONNECTORDB); + } + + this.userLinksDB = + (ConcurrentMap) this.clusterContainerService.getCache(TOPOUSERLINKSDB); + if (userLinksDB == null) { + log.error("Failed to get cache for " + TOPOUSERLINKSDB); + } } /** - * Function called after the topology manager has registered the - * service in OSGi service registry. + * Function called after the topology manager has registered the service in + * OSGi service registry. * */ void started() { + // Start the batcher thread for the cluster wide topology updates + notifyThread.start(); // SollicitRefresh MUST be called here else if called at init // time it may sollicit refresh too soon. log.debug("Sollicit topology refresh"); topoService.sollicitRefresh(); } + void stop() { + shuttingDown = true; + notifyThread.interrupt(); + } + /** - * Function called by the dependency manager when at least one - * dependency become unsatisfied or when the component is shutting - * down because for example bundle is being stopped. + * Function called by the dependency manager when at least one dependency + * become unsatisfied or when the component is shutting down because for + * example bundle is being stopped. * */ void destroy() { - if (this.clusterContainerService == null) { - log.error("Cluster Services is null, not expected!"); - this.edgesDB = null; - this.hostsDB = null; - this.nodeConnectorsDB = null; - return; - } - this.clusterContainerService.destroyCache("topologymanager.edgesDB"); - this.edgesDB = null; - this.clusterContainerService.destroyCache("topologymanager.hostsDB"); - this.hostsDB = null; - this.clusterContainerService - .destroyCache("topologymanager.nodeConnectorDB"); - this.nodeConnectorsDB = null; - log.debug("Topology Manager DB DE-allocated"); + notifyQ.clear(); + notifyThread = null; } @SuppressWarnings("unchecked") private void loadConfiguration() { ObjectReader objReader = new ObjectReader(); - ConcurrentMap confList = (ConcurrentMap) objReader - .read(this, userLinksFileName); - - if (confList == null) { - return; - } + ConcurrentMap confList = + (ConcurrentMap) objReader.read(this, userLinksFileName); - for (TopologyUserLinkConfig conf : confList.values()) { - addUserLink(conf); + if (confList != null) { + for (TopologyUserLinkConfig conf : confList.values()) { + addUserLink(conf); + } } } @Override public Status saveConfig() { - // Publish the save config event to the cluster nodes - /** - * Get the CLUSTERING SERVICES WORKING BEFORE TRYING THIS - - configSaveEvent.put(new Date().getTime(), SAVE); - */ return saveConfigInternal(); } public Status saveConfigInternal() { - Status retS; ObjectWriter objWriter = new ObjectWriter(); - retS = objWriter.write( - new ConcurrentHashMap( - userLinks), userLinksFileName); + Status saveStatus = objWriter.write( + new ConcurrentHashMap(userLinksDB), userLinksFileName); - if (retS.isSuccess()) { - return retS; - } else { - return new Status(StatusCode.INTERNALERROR, "Save failed"); + if (! saveStatus.isSuccess()) { + return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription()); } + return saveStatus; } @Override @@ -278,31 +327,25 @@ public class TopologyManagerImpl implements ITopologyManager, return null; } - HashMap> res = new HashMap>(); - for (Edge key : this.edgesDB.keySet()) { + Map> res = new HashMap>(); + for (Edge edge : this.edgesDB.keySet()) { // Lets analyze the tail - Node node = key.getTailNodeConnector().getNode(); + Node node = edge.getTailNodeConnector().getNode(); Set nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); + res.put(node, nodeEdges); } - nodeEdges.add(key); - // We need to re-add to the MAP even if the element was - // already there so in case of clustered services the map - // gets updated in the cluster - res.put(node, nodeEdges); + nodeEdges.add(edge); // Lets analyze the head - node = key.getHeadNodeConnector().getNode(); + node = edge.getHeadNodeConnector().getNode(); nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); + res.put(node, nodeEdges); } - nodeEdges.add(key); - // We need to re-add to the MAP even if the element was - // already there so in case of clustered services the map - // gets updated in the cluster - res.put(node, nodeEdges); + nodeEdges.add(edge); } return res; @@ -320,11 +363,34 @@ public class TopologyManagerImpl implements ITopologyManager, } /** - * The Map returned is a copy of the current topology hence if the - * topology changes the copy doesn't + * This method returns true if the edge is an ISL link. + * + * @param e + * The edge + * @return true if it is an ISL link + */ + public boolean isISLink(Edge e) { + return (!isProductionLink(e)); + } + + /** + * This method returns true if the edge is a production link. + * + * @param e + * The edge + * @return true if it is a production link + */ + public boolean isProductionLink(Edge e) { + return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION) + || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); + } + + /** + * The Map returned is a copy of the current topology hence if the topology + * changes the copy doesn't * - * @return A Map representing the current topology expressed as - * edges of the network + * @return A Map representing the current topology expressed as edges of the + * network */ @Override public Map> getEdges() { @@ -332,30 +398,20 @@ public class TopologyManagerImpl implements ITopologyManager, return null; } - HashMap> res = new HashMap>(); - for (Edge key : this.edgesDB.keySet()) { + Map> edgeMap = new HashMap>(); + Set props; + for (Map.Entry> edgeEntry : edgesDB.entrySet()) { // Sets of props are copied because the composition of // those properties could change with time - HashSet prop = new HashSet(this.edgesDB - .get(key)); + props = new HashSet(edgeEntry.getValue()); // We can simply reuse the key because the object is // immutable so doesn't really matter that we are // referencing the only owned by a different table, the // meaning is the same because doesn't change with time. - res.put(key, prop); + edgeMap.put(edgeEntry.getKey(), props); } - return res; - } - - // TODO remove with spring-dm removal - /** - * @param set the topologyAware to set - */ - public void setTopologyAware(Set set) { - for (Object s : set) { - setTopologyManagerAware((ITopologyManagerAware) s); - } + return edgeMap; } @Override @@ -364,7 +420,7 @@ public class TopologyManagerImpl implements ITopologyManager, return null; } - return (this.hostsDB.keySet()); + return (new HashSet(this.hostsDB.keySet())); } @Override @@ -373,88 +429,92 @@ public class TopologyManagerImpl implements ITopologyManager, return null; } HashMap> res = new HashMap>(); - - for (NodeConnector p : this.hostsDB.keySet()) { - Node n = p.getNode(); - Set pSet = res.get(n); - if (pSet == null) { + Node node; + Set portSet; + for (NodeConnector nc : this.hostsDB.keySet()) { + node = nc.getNode(); + portSet = res.get(node); + if (portSet == null) { // Create the HashSet if null - pSet = new HashSet(); - res.put(n, pSet); + portSet = new HashSet(); + res.put(node, portSet); } // Keep updating the HashSet, given this is not a // clustered map we can just update the set without // worrying to update the hashmap. - pSet.add(p); + portSet.add(nc); } return (res); } @Override - public Host getHostAttachedToNodeConnector(NodeConnector p) { - if (this.hostsDB == null) { + public Host getHostAttachedToNodeConnector(NodeConnector port) { + ImmutablePair> host; + if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) { return null; } - - return (this.hostsDB.get(p).getLeft()); + return host.getLeft(); } @Override - public void updateHostLink(NodeConnector p, Host h, UpdateType t, - Set props) { - if (this.hostsDB == null) { - return; + public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set props) { + + // Clone the property set in case non null else just + // create an empty one. Caches allocated via infinispan + // don't allow null values + if (props == null) { + props = new HashSet(); + } else { + props = new HashSet(props); } + ImmutablePair> thisHost = new ImmutablePair>(h, props); switch (t) { case ADDED: case CHANGED: - // Clone the property set in case non null else just - // create an empty one. Caches allocated via infinispan - // don't allow null values - if (props == null) { - props = new HashSet(); - } else { - props = new HashSet(props); - } - - this.hostsDB.put(p, new ImmutablePair(h, props)); + this.hostsDB.put(port, thisHost); break; case REMOVED: - this.hostsDB.remove(p); + //remove only if hasn't been concurrently modified + this.hostsDB.remove(port, thisHost); break; } } - @Override - public void edgeUpdate(Edge e, UpdateType type, Set props) { + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { switch (type) { case ADDED: // Make sure the props are non-null if (props == null) { - props = (Set) new HashSet(); + props = new HashSet(); } else { - // Copy the set so noone is going to change the content - props = (Set) new HashSet(props); + props = new HashSet(props); + } + + //in case of node switch-over to a different cluster controller, + //let's retain edge props + Set currentProps = this.edgesDB.get(e); + if (currentProps != null){ + props.addAll(currentProps); } - // Now make sure thre is the creation timestamp for the - // edge, if not there timestamp with the first update + // Now make sure there is the creation timestamp for the + // edge, if not there, stamp with the first update boolean found_create = false; for (Property prop : props) { if (prop instanceof TimeStamp) { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { found_create = true; + break; } } } if (!found_create) { - TimeStamp t = new TimeStamp(System.currentTimeMillis(), - "creation"); + TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation"); props.add(t); } @@ -463,12 +523,14 @@ public class TopologyManagerImpl implements ITopologyManager, this.edgesDB.put(e, props); // Now populate the DB of NodeConnectors - // NOTE WELL: properties are empy sets, not really needed + // NOTE WELL: properties are empty sets, not really needed // for now. - this.nodeConnectorsDB.put(e.getHeadNodeConnector(), - new HashSet()); - this.nodeConnectorsDB.put(e.getTailNodeConnector(), - new HashSet()); + // The DB only contains ISL ports + if (isISLink(e)) { + this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet(1)); + this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet(1)); + } + log.trace("Edge {} {}", e.toString(), type.name()); break; case REMOVED: // Now remove the edge from edgesDB @@ -483,31 +545,32 @@ public class TopologyManagerImpl implements ITopologyManager, // should be safe to assume that won't happen. this.nodeConnectorsDB.remove(e.getHeadNodeConnector()); this.nodeConnectorsDB.remove(e.getTailNodeConnector()); + log.trace("Edge {} {}", e.toString(), type.name()); break; case CHANGED: - Set old_props = this.edgesDB.get(e); + Set oldProps = this.edgesDB.get(e); // When property changes lets make sure we can change it // all except the creation time stamp because that should // be changed only when the edge is destroyed and created // again - TimeStamp tc = null; - for (Property prop : old_props) { + TimeStamp timeStamp = null; + for (Property prop : oldProps) { if (prop instanceof TimeStamp) { - TimeStamp t = (TimeStamp) prop; - if (t.getTimeStampName().equals("creation")) { - tc = t; + TimeStamp tsProp = (TimeStamp) prop; + if (tsProp.getTimeStampName().equals("creation")) { + timeStamp = tsProp; + break; } } } - // Now lest make sure new properties are non-null - // Make sure the props are non-null + // Now lets make sure new properties are non-null if (props == null) { - props = (Set) new HashSet(); + props = new HashSet(); } else { // Copy the set so noone is going to change the content - props = (Set) new HashSet(props); + props = new HashSet(props); } // Now lets remove the creation property if exist in the @@ -518,198 +581,126 @@ public class TopologyManagerImpl implements ITopologyManager, TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { i.remove(); + break; } } } // Now lets add the creation timestamp in it - if (tc != null) { - props.add(tc); + if (timeStamp != null) { + props.add(timeStamp); } // Finally update this.edgesDB.put(e, props); + log.trace("Edge {} {}", e.toString(), type.name()); break; } + return new TopoEdgeUpdate(e, props, type); + } + + @Override + public void edgeUpdate(List topoedgeupdateList) { + List teuList = new ArrayList(); + for (int i = 0; i < topoedgeupdateList.size(); i++) { + Edge e = topoedgeupdateList.get(i).getEdge(); + Set p = topoedgeupdateList.get(i).getProperty(); + UpdateType type = topoedgeupdateList.get(i).getUpdateType(); + TopoEdgeUpdate teu = edgeUpdate(e, type, p); + teuList.add(teu); + } // Now update the listeners for (ITopologyManagerAware s : this.topologyManagerAware) { try { - s.edgeUpdate(e, type, props); + s.edgeUpdate(teuList); } catch (Exception exc) { - log.error("Exception on callback", exc); + log.error("Exception on edge update:", exc); } } + } private Edge getReverseLinkTuple(TopologyUserLinkConfig link) { - TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(link - .getName(), link.getDstSwitchId(), link.getDstPort(), link - .getSrcSwitchId(), link.getSrcPort()); + TopologyUserLinkConfig rLink = new TopologyUserLinkConfig( + link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector()); return getLinkTuple(rLink); } - private Edge getLinkTuple(TopologyUserLinkConfig link) { - Edge linkTuple = null; - Long sID = link.getSrcSwitchIDLong(); - Long dID = link.getDstSwitchIDLong(); - Short srcPort = Short.valueOf((short) 0); - Short dstPort = Short.valueOf((short) 0); - if (link.isSrcPortByName()) { - // TODO find the inventory service to do this, for now 0 - //srcPort = srcSw.getPortNumber(link.getSrcPort()); - } else { - srcPort = Short.parseShort(link.getSrcPort()); - } - if (link.isDstPortByName()) { - //dstPort = dstSw.getPortNumber(link.getDstPort());; - } else { - dstPort = Short.parseShort(link.getDstPort()); - } - - // if atleast 1 link exists for the srcPort and atleast 1 link exists for the dstPort - // that makes it ineligible for the Manual link addition - // This is just an extra protection to avoid mis-programming. - boolean srcLinkExists = false; - boolean dstLinkExists = false; - /** - * Disabling this optimization for now to understand the real benefit of doing this. - * Since this is a Manual Link addition, the user knows what he is doing and it is - * not good to restrict such creativity... - */ - /* - Set links = oneTopology.getLinks().keySet(); - if (links != null) { - for (Edge eLink : links) { - if (!eLink.isUserCreated() && - eLink.getSrc().getSid().equals(link.getSrcSwitchIDLong()) && - eLink.getSrc().getPort().equals(srcPort)) { - srcLinkExists = true; - } - - if (!eLink.isUserCreated() && - eLink.getSrc().getSid().equals(link.getSrcSwitchIDLong()) && - eLink.getSrc().getPort().equals(srcPort)) { - dstLinkExists = true; - } - - if (!eLink.isUserCreated() && - eLink.getDst().getSid().equals(link.getSrcSwitchIDLong()) && - eLink.getDst().getPort().equals(srcPort)) { - srcLinkExists = true; - } - - if (!eLink.isUserCreated() && - eLink.getDst().getSid().equals(link.getSrcSwitchIDLong()) && - eLink.getDst().getPort().equals(srcPort)) { - dstLinkExists = true; - } - } - } - */ - //TODO check a way to validate the port with inventory services - //if (srcSw.getPorts().contains(srcPort) && - //dstSw.getPorts().contains(srcPort) && - if (!srcLinkExists && !dstLinkExists) { - Node sNode = null; - Node dNode = null; - NodeConnector sPort = null; - NodeConnector dPort = null; - linkTuple = null; - try { - sNode = new Node(Node.NodeIDType.OPENFLOW, sID); - dNode = new Node(Node.NodeIDType.OPENFLOW, dID); - sPort = new NodeConnector( - NodeConnector.NodeConnectorIDType.OPENFLOW, srcPort, - sNode); - dPort = new NodeConnector( - NodeConnector.NodeConnectorIDType.OPENFLOW, dstPort, - dNode); - linkTuple = new Edge(sPort, dPort); - } catch (ConstructionException cex) { - } - return linkTuple; - } - - if (srcLinkExists && dstLinkExists) { - link.setStatus(TopologyUserLinkConfig.STATUS.INCORRECT); + private Edge getLinkTuple(TopologyUserLinkConfig link) { + NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector()); + NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector()); + try { + return new Edge(srcNodeConnector, dstNodeConnector); + } catch (Exception e) { + return null; } - return null; } @Override public ConcurrentMap getUserLinks() { - return userLinks; + return new ConcurrentHashMap(userLinksDB); } @Override - public Status addUserLink(TopologyUserLinkConfig link) { - if (!link.isValid()) { - return new Status(StatusCode.BADREQUEST, - "Configuration Invalid. Please check the parameters"); - } - if (userLinks.get(link.getName()) != null) { - return new Status(StatusCode.CONFLICT, - "Link with name : " + link.getName() - + " already exists. Please use another name"); + public Status addUserLink(TopologyUserLinkConfig userLink) { + if (!userLink.isValid()) { + return new Status(StatusCode.BADREQUEST, + "User link configuration invalid."); + } + userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); + + //Check if this link already configured + //NOTE: infinispan cache doesn't support Map.containsValue() + // (which is linear time in most ConcurrentMap impl anyway) + for (TopologyUserLinkConfig existingLink : userLinksDB.values()) { + if (existingLink.equals(userLink)) { + return new Status(StatusCode.CONFLICT, "Link configuration exists"); + } } - if (userLinks.containsValue(link)) { - return new Status(StatusCode.CONFLICT, "Link configuration exists"); + //attempt put, if mapping for this key already existed return conflict + if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) { + return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName() + + " already exists. Please use another name"); } - link.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); - userLinks.put(link.getName(), link); - - Edge linkTuple = getLinkTuple(link); + Edge linkTuple = getLinkTuple(userLink); if (linkTuple != null) { - try { - // TODO The onetopology will be gone too, topology - //manager is the master of the topology at this point - //if (oneTopology.addUserConfiguredLink(linkTuple)) { - linkTuple = getReverseLinkTuple(link); - //if (oneTopology.addUserConfiguredLink(linkTuple)) { - link.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); - //} - //} - } catch (Exception e) { - return new Status(StatusCode.INTERNALERROR, - "Exception while adding custom link : " + - e.getMessage()); + if (!isProductionLink(linkTuple)) { + edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); + } + + linkTuple = getReverseLinkTuple(userLink); + if (linkTuple != null) { + userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); + if (!isProductionLink(linkTuple)) { + edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); + } } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } @Override public Status deleteUserLink(String linkName) { if (linkName == null) { - return new Status(StatusCode.BADREQUEST, - "A valid linkName is required to Delete a link"); + return new Status(StatusCode.BADREQUEST, "User link name cannot be null."); } - TopologyUserLinkConfig link = userLinks.get(linkName); - - Edge linkTuple = getLinkTuple(link); - userLinks.remove(linkName); - if (linkTuple != null) { - try { - //oneTopology.deleteUserConfiguredLink(linkTuple); - } catch (Exception e) { - log - .warn("Harmless : Exception while Deleting User Configured link " - + link + " " + e.toString()); + TopologyUserLinkConfig link = userLinksDB.remove(linkName); + Edge linkTuple; + if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) { + if (! isProductionLink(linkTuple)) { + edgeUpdate(linkTuple, UpdateType.REMOVED, null); } + linkTuple = getReverseLinkTuple(link); - try { - //oneTopology.deleteUserConfiguredLink(linkTuple); - } catch (Exception e) { - log - .error("Harmless : Exception while Deleting User Configured Reverse link " - + link + " " + e.toString()); + if (! isProductionLink(linkTuple)) { + edgeUpdate(linkTuple, UpdateType.REMOVED, null); } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } private void registerWithOSGIConsole() { @@ -723,67 +714,57 @@ public class TopologyManagerImpl implements ITopologyManager, public String getHelp() { StringBuffer help = new StringBuffer(); help.append("---Topology Manager---\n"); - help - .append("\t addTopo name \n"); - help.append("\t delTopo name\n"); - help.append("\t _printTopo\n"); + help.append("\t addUserLink \n"); + help.append("\t deleteUserLink \n"); + help.append("\t printUserLink\n"); + help.append("\t printNodeEdges\n"); return help.toString(); } - public void _printTopo(CommandInterpreter ci) { - for (String name : this.userLinks.keySet()) { - ci.println(name + " : " + userLinks.get(name)); + public void _printUserLink(CommandInterpreter ci) { + for (String name : this.userLinksDB.keySet()) { + TopologyUserLinkConfig linkConfig = userLinksDB.get(name); + ci.println("Name : " + name); + ci.println(linkConfig); + ci.println("Edge " + getLinkTuple(linkConfig)); + ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig)); } } - public void _addTopo(CommandInterpreter ci) { + public void _addUserLink(CommandInterpreter ci) { String name = ci.nextArgument(); if ((name == null)) { ci.println("Please enter a valid Name"); return; } - String dpid = ci.nextArgument(); - if (dpid == null) { - ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx"); - return; - } - try { - HexEncode.stringToLong(dpid); - } catch (Exception e) { - ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx"); + String ncStr1 = ci.nextArgument(); + if (ncStr1 == null) { + ci.println("Please enter two node connector strings"); return; } - - String port = ci.nextArgument(); - if (port == null) { - ci.println("Invalid port number"); + String ncStr2 = ci.nextArgument(); + if (ncStr2 == null) { + ci.println("Please enter second node connector string"); return; } - String ddpid = ci.nextArgument(); - if (ddpid == null) { - ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx"); + NodeConnector nc1 = NodeConnector.fromString(ncStr1); + if (nc1 == null) { + ci.println("Invalid input node connector 1 string: " + ncStr1); return; } - try { - HexEncode.stringToLong(ddpid); - } catch (Exception e) { - ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx"); + NodeConnector nc2 = NodeConnector.fromString(ncStr2); + if (nc2 == null) { + ci.println("Invalid input node connector 2 string: " + ncStr2); return; } - String dport = ci.nextArgument(); - if (dport == null) { - ci.println("Invalid port number"); - return; - } - TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, - dpid, port, ddpid, dport); + TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2); ci.println(this.addUserLink(config)); } - public void _delTopo(CommandInterpreter ci) { + public void _deleteUserLink(CommandInterpreter ci) { String name = ci.nextArgument(); if ((name == null)) { ci.println("Please enter a valid Name"); @@ -792,10 +773,30 @@ public class TopologyManagerImpl implements ITopologyManager, this.deleteUserLink(name); } + public void _printNodeEdges(CommandInterpreter ci) { + Map> nodeEdges = getNodeEdges(); + if (nodeEdges == null) { + return; + } + Set nodeSet = nodeEdges.keySet(); + if (nodeSet == null) { + return; + } + ci.println(" Node Edge"); + for (Node node : nodeSet) { + Set edgeSet = nodeEdges.get(node); + if (edgeSet == null) { + continue; + } + for (Edge edge : edgeSet) { + ci.println(node + " " + edge); + } + } + } + @Override public Object readObject(ObjectInputStream ois) throws FileNotFoundException, IOException, ClassNotFoundException { - // TODO Auto-generated method stub return ois.readObject(); } @@ -806,12 +807,100 @@ public class TopologyManagerImpl implements ITopologyManager, @Override public void edgeOverUtilized(Edge edge) { - log.warn("Link Utilization above normal: " + edge); + log.warn("Link Utilization above normal: {}", edge); } @Override public void edgeUtilBackToNormal(Edge edge) { - log.warn("Link Utilization back to normal: " + edge); + log.warn("Link Utilization back to normal: {}", edge); } + private void edgeUpdateClusterWide(Edge e, UpdateType type, Set props, boolean isLocal) { + TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type); + upd.setLocal(isLocal); + notifyQ.add(upd); + } + + @Override + public void entryCreated(final Object key, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + // This is the case of an Edge being added to the topology DB + final Edge e = (Edge) key; + log.trace("Edge {} CREATED isLocal:{}", e, originLocal); + edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal); + } + } + + @Override + public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + final Edge e = (Edge) key; + log.trace("Edge {} CHANGED isLocal:{}", e, originLocal); + final Set props = (Set) new_value; + edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal); + } + } + + @Override + public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + final Edge e = (Edge) key; + log.trace("Edge {} DELETED isLocal:{}", e, originLocal); + edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal); + } + } + + class TopologyNotify implements Runnable { + private final BlockingQueue notifyQ; + private TopoEdgeUpdate entry; + private List teuList = new ArrayList(); + private boolean notifyListeners; + + TopologyNotify(BlockingQueue notifyQ) { + this.notifyQ = notifyQ; + } + + @Override + public void run() { + while (true) { + try { + log.trace("New run of TopologyNotify"); + notifyListeners = false; + // First we block waiting for an element to get in + entry = notifyQ.take(); + // Then we drain the whole queue if elements are + // in it without getting into any blocking condition + for (; entry != null; entry = notifyQ.poll()) { + teuList.add(entry); + notifyListeners = true; + } + + // Notify listeners only if there were updates drained else + // give up + if (notifyListeners) { + log.trace("Notifier thread, notified a listener"); + // Now update the listeners + for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) { + try { + s.edgeUpdate(teuList); + } catch (Exception exc) { + log.error("Exception on edge update:", exc); + } + } + } + teuList.clear(); + + // Lets sleep for sometime to allow aggregation of event + Thread.sleep(100); + } catch (InterruptedException e1) { + log.warn("TopologyNotify interrupted {}", e1.getMessage()); + if (shuttingDown) { + return; + } + } catch (Exception e2) { + log.error("", e2); + } + } + } + } }