X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Ftopologymanager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Ftopologymanager%2Finternal%2FTopologyManagerImpl.java;h=4972d3b5b5d73d41df3cfbbb99543db8fde2eb56;hp=277465813baef5d1e183f36ac64efad92cf7fb0c;hb=c11fedfa1195b5d17cb2493fba1a4f0fde862ce1;hpb=84e8316159f90f224f75e86a606c525e53b2ff7a diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index 277465813b..4972d3b5b5 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -12,17 +12,20 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; -import java.util.Collections; +import java.util.Date; import java.util.Dictionary; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.felix.dm.Component; @@ -30,30 +33,29 @@ import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; -import org.opendaylight.controller.sal.core.ConstructionException; import org.opendaylight.controller.sal.core.Edge; import org.opendaylight.controller.sal.core.Host; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; -import org.opendaylight.controller.sal.core.NodeConnector.NodeConnectorIDType; import org.opendaylight.controller.sal.core.Property; import org.opendaylight.controller.sal.core.TimeStamp; import org.opendaylight.controller.sal.core.UpdateType; -import org.opendaylight.controller.sal.core.Node.NodeIDType; import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; import org.opendaylight.controller.sal.topology.TopoEdgeUpdate; -import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.ObjectReader; import org.opendaylight.controller.sal.utils.ObjectWriter; import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; +import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware; import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; @@ -65,34 +67,47 @@ import org.slf4j.LoggerFactory; * network topology. It provides service for applications to interact with * topology database and notifies all the listeners of topology changes. */ -public class TopologyManagerImpl implements ITopologyManager, -IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, -CommandProvider { - private static final Logger log = LoggerFactory - .getLogger(TopologyManagerImpl.class); - private ITopologyService topoService = null; - private IClusterContainerServices clusterContainerService = null; +public class TopologyManagerImpl implements + ICacheUpdateAware, + ITopologyManager, + IConfigurationContainerAware, + IListenTopoUpdates, + IObjectReader, + CommandProvider { + static final String TOPOEDGESDB = "topologymanager.edgesDB"; + static final String TOPOHOSTSDB = "topologymanager.hostsDB"; + static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB"; + static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; + private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); + private static final String SAVE = "Save"; + private ITopologyService topoService; + private IClusterContainerServices clusterContainerService; // DB of all the Edges with properties which constitute our topology - private ConcurrentMap> edgesDB = null; + private ConcurrentMap> edgesDB; // DB of all NodeConnector which are part of ISL Edges, meaning they // are connected to another NodeConnector on the other side of an ISL link. // NodeConnector of a Production Edge is not part of this DB. - private ConcurrentMap> nodeConnectorsDB = null; + private ConcurrentMap> nodeConnectorsDB; // DB of all the NodeConnectors with an Host attached to it - private ConcurrentMap>> hostsDB = null; + private ConcurrentMap>> hostsDB; // Topology Manager Aware listeners - private Set topologyManagerAware = Collections - .synchronizedSet(new HashSet()); - + private Set topologyManagerAware = new CopyOnWriteArraySet(); + // Topology Manager Aware listeners - for clusterwide updates + private Set topologyManagerClusterWideAware = + new CopyOnWriteArraySet(); private static String ROOT = GlobalConstants.STARTUPHOME.toString(); - private String userLinksFileName = null; - private ConcurrentMap userLinks; + private String userLinksFileName; + private ConcurrentMap userLinksDB; + private BlockingQueue notifyQ = new LinkedBlockingQueue(); + private volatile Boolean shuttingDown = false; + private Thread notifyThread; + void nonClusterObjectCreate() { edgesDB = new ConcurrentHashMap>(); hostsDB = new ConcurrentHashMap>>(); - userLinks = new ConcurrentHashMap(); nodeConnectorsDB = new ConcurrentHashMap>(); + userLinksDB = new ConcurrentHashMap(); } void setTopologyManagerAware(ITopologyManagerAware s) { @@ -109,6 +124,20 @@ CommandProvider { } } + void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) { + if (this.topologyManagerClusterWideAware != null) { + log.debug("Adding ITopologyManagerClusterWideAware: {}", s); + this.topologyManagerClusterWideAware.add(s); + } + } + + void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) { + if (this.topologyManagerClusterWideAware != null) { + log.debug("Removing ITopologyManagerClusterWideAware: {}", s); + this.topologyManagerClusterWideAware.remove(s); + } + } + void setTopoService(ITopologyService s) { log.debug("Adding ITopologyService: {}", s); this.topoService = s; @@ -139,8 +168,10 @@ CommandProvider { * */ void init(Component c) { + allocateCaches(); + retrieveCaches(); String containerName = null; - Dictionary props = c.getServiceProperties(); + Dictionary props = c.getServiceProperties(); if (props != null) { containerName = (String) props.get("containerName"); } else { @@ -148,57 +179,86 @@ CommandProvider { containerName = "UNKNOWN"; } - if (this.clusterContainerService == null) { - log.error("Cluster Services is null, not expected!"); - return; - } + userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; + registerWithOSGIConsole(); + loadConfiguration(); + // Restore the shuttingDown status on init of the component + shuttingDown = false; + notifyThread = new Thread(new TopologyNotify(notifyQ)); + } - if (this.topoService == null) { - log.error("Topology Services is null, not expected!"); - return; + @SuppressWarnings({ "unchecked", "deprecation" }) + private void allocateCaches() { + try { + this.edgesDB = + (ConcurrentMap>) this.clusterContainerService.createCache(TOPOEDGESDB, + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + } catch (CacheExistException cee) { + log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed"); + } catch (CacheConfigException cce) { + log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode"); } try { - this.edgesDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.edgesDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.hostsDB = + (ConcurrentMap>>) this.clusterContainerService.createCache( + TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.edgesDB Cache already exists - " - + "destroy and recreate if needed"); + log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.edgesDB Cache configuration invalid - " - + "check cache mode"); + log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode"); } try { - this.hostsDB = (ConcurrentMap>>) this.clusterContainerService - .createCache("topologymanager.hostsDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.nodeConnectorsDB = + (ConcurrentMap>) this.clusterContainerService.createCache( + TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.hostsDB Cache already exists - " - + "destroy and recreate if needed"); + log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.hostsDB Cache configuration invalid - " - + "check cache mode"); + log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode"); } try { - this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.nodeConnectorDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.userLinksDB = + (ConcurrentMap) this.clusterContainerService.createCache( + TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.nodeConnectorDB Cache already exists" - + " - destroy and recreate if needed"); + log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.nodeConnectorDB Cache configuration " - + "invalid - check cache mode"); + log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode"); } + } - userLinks = new ConcurrentHashMap(); + @SuppressWarnings({ "unchecked", "deprecation" }) + private void retrieveCaches() { + if (this.clusterContainerService == null) { + log.error("Cluster Services is null, can't retrieve caches."); + return; + } - userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; - registerWithOSGIConsole(); - loadConfiguration(); + this.edgesDB = (ConcurrentMap>) this.clusterContainerService.getCache(TOPOEDGESDB); + if (edgesDB == null) { + log.error("Failed to get cache for " + TOPOEDGESDB); + } + + this.hostsDB = + (ConcurrentMap>>) this.clusterContainerService.getCache(TOPOHOSTSDB); + if (hostsDB == null) { + log.error("Failed to get cache for " + TOPOHOSTSDB); + } + + this.nodeConnectorsDB = + (ConcurrentMap>) this.clusterContainerService.getCache(TOPONODECONNECTORDB); + if (nodeConnectorsDB == null) { + log.error("Failed to get cache for " + TOPONODECONNECTORDB); + } + + this.userLinksDB = + (ConcurrentMap) this.clusterContainerService.getCache(TOPOUSERLINKSDB); + if (userLinksDB == null) { + log.error("Failed to get cache for " + TOPOUSERLINKSDB); + } } /** @@ -207,12 +267,19 @@ CommandProvider { * */ void started() { + // Start the batcher thread for the cluster wide topology updates + notifyThread.start(); // SollicitRefresh MUST be called here else if called at init // time it may sollicit refresh too soon. log.debug("Sollicit topology refresh"); topoService.sollicitRefresh(); } + void stop() { + shuttingDown = true; + notifyThread.interrupt(); + } + /** * Function called by the dependency manager when at least one dependency * become unsatisfied or when the component is shutting down because for @@ -220,62 +287,38 @@ CommandProvider { * */ void destroy() { - if (this.clusterContainerService == null) { - log.error("Cluster Services is null, not expected!"); - this.edgesDB = null; - this.hostsDB = null; - this.nodeConnectorsDB = null; - return; - } - this.clusterContainerService.destroyCache("topologymanager.edgesDB"); - this.edgesDB = null; - this.clusterContainerService.destroyCache("topologymanager.hostsDB"); - this.hostsDB = null; - this.clusterContainerService - .destroyCache("topologymanager.nodeConnectorDB"); - this.nodeConnectorsDB = null; - log.debug("Topology Manager DB Deallocated"); + notifyQ.clear(); + notifyThread = null; } @SuppressWarnings("unchecked") private void loadConfiguration() { ObjectReader objReader = new ObjectReader(); - ConcurrentMap confList = (ConcurrentMap) objReader - .read(this, userLinksFileName); + ConcurrentMap confList = + (ConcurrentMap) objReader.read(this, userLinksFileName); - if (confList == null) { - return; - } - - for (TopologyUserLinkConfig conf : confList.values()) { - addUserLink(conf); + if (confList != null) { + for (TopologyUserLinkConfig conf : confList.values()) { + addUserLink(conf); + } } } @Override public Status saveConfig() { - // Publish the save config event to the cluster nodes - /** - * Get the CLUSTERING SERVICES WORKING BEFORE TRYING THIS - * - * configSaveEvent.put(new Date().getTime(), SAVE); - */ return saveConfigInternal(); } public Status saveConfigInternal() { - Status retS; ObjectWriter objWriter = new ObjectWriter(); - retS = objWriter - .write(new ConcurrentHashMap( - userLinks), userLinksFileName); + Status saveStatus = objWriter.write( + new ConcurrentHashMap(userLinksDB), userLinksFileName); - if (retS.isSuccess()) { - return retS; - } else { - return new Status(StatusCode.INTERNALERROR, "Save failed"); + if (! saveStatus.isSuccess()) { + return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription()); } + return saveStatus; } @Override @@ -284,31 +327,25 @@ CommandProvider { return null; } - HashMap> res = new HashMap>(); - for (Edge key : this.edgesDB.keySet()) { + Map> res = new HashMap>(); + for (Edge edge : this.edgesDB.keySet()) { // Lets analyze the tail - Node node = key.getTailNodeConnector().getNode(); + Node node = edge.getTailNodeConnector().getNode(); Set nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); + res.put(node, nodeEdges); } - nodeEdges.add(key); - // We need to re-add to the MAP even if the element was - // already there so in case of clustered services the map - // gets updated in the cluster - res.put(node, nodeEdges); + nodeEdges.add(edge); // Lets analyze the head - node = key.getHeadNodeConnector().getNode(); + node = edge.getHeadNodeConnector().getNode(); nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); + res.put(node, nodeEdges); } - nodeEdges.add(key); - // We need to re-add to the MAP even if the element was - // already there so in case of clustered services the map - // gets updated in the cluster - res.put(node, nodeEdges); + nodeEdges.add(edge); } return res; @@ -344,10 +381,8 @@ CommandProvider { * @return true if it is a production link */ public boolean isProductionLink(Edge e) { - return (e.getHeadNodeConnector().getType() - .equals(NodeConnector.NodeConnectorIDType.PRODUCTION) || e - .getTailNodeConnector().getType() - .equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); + return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION) + || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); } /** @@ -363,31 +398,20 @@ CommandProvider { return null; } - HashMap> res = new HashMap>(); - for (Edge key : this.edgesDB.keySet()) { + Map> edgeMap = new HashMap>(); + Set props; + for (Map.Entry> edgeEntry : edgesDB.entrySet()) { // Sets of props are copied because the composition of // those properties could change with time - HashSet prop = new HashSet( - this.edgesDB.get(key)); + props = new HashSet(edgeEntry.getValue()); // We can simply reuse the key because the object is // immutable so doesn't really matter that we are // referencing the only owned by a different table, the // meaning is the same because doesn't change with time. - res.put(key, prop); + edgeMap.put(edgeEntry.getKey(), props); } - return res; - } - - // TODO remove with spring-dm removal - /** - * @param set - * the topologyAware to set - */ - public void setTopologyAware(Set set) { - for (Object s : set) { - setTopologyManagerAware((ITopologyManagerAware) s); - } + return edgeMap; } @Override @@ -396,7 +420,7 @@ CommandProvider { return null; } - return (this.hostsDB.keySet()); + return (new HashSet(this.hostsDB.keySet())); } @Override @@ -405,88 +429,92 @@ CommandProvider { return null; } HashMap> res = new HashMap>(); - - for (NodeConnector p : this.hostsDB.keySet()) { - Node n = p.getNode(); - Set pSet = res.get(n); - if (pSet == null) { + Node node; + Set portSet; + for (NodeConnector nc : this.hostsDB.keySet()) { + node = nc.getNode(); + portSet = res.get(node); + if (portSet == null) { // Create the HashSet if null - pSet = new HashSet(); - res.put(n, pSet); + portSet = new HashSet(); + res.put(node, portSet); } // Keep updating the HashSet, given this is not a // clustered map we can just update the set without // worrying to update the hashmap. - pSet.add(p); + portSet.add(nc); } return (res); } @Override - public Host getHostAttachedToNodeConnector(NodeConnector p) { - if (this.hostsDB == null) { + public Host getHostAttachedToNodeConnector(NodeConnector port) { + ImmutablePair> host; + if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) { return null; } - - return (this.hostsDB.get(p).getLeft()); + return host.getLeft(); } @Override - public void updateHostLink(NodeConnector p, Host h, UpdateType t, - Set props) { - if (this.hostsDB == null) { - return; + public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set props) { + + // Clone the property set in case non null else just + // create an empty one. Caches allocated via infinispan + // don't allow null values + if (props == null) { + props = new HashSet(); + } else { + props = new HashSet(props); } + ImmutablePair> thisHost = new ImmutablePair>(h, props); switch (t) { case ADDED: case CHANGED: - // Clone the property set in case non null else just - // create an empty one. Caches allocated via infinispan - // don't allow null values - if (props == null) { - props = new HashSet(); - } else { - props = new HashSet(props); - } - - this.hostsDB.put(p, new ImmutablePair(h, props)); + this.hostsDB.put(port, thisHost); break; case REMOVED: - this.hostsDB.remove(p); + //remove only if hasn't been concurrently modified + this.hostsDB.remove(port, thisHost); break; } } - private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, - Set props) { + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { switch (type) { case ADDED: // Make sure the props are non-null if (props == null) { - props = (Set) new HashSet(); + props = new HashSet(); } else { - // Copy the set so noone is going to change the content - props = (Set) new HashSet(props); + props = new HashSet(props); + } + + //in case of node switch-over to a different cluster controller, + //let's retain edge props + Set currentProps = this.edgesDB.get(e); + if (currentProps != null){ + props.addAll(currentProps); } // Now make sure there is the creation timestamp for the - // edge, if not there timestamp with the first update + // edge, if not there, stamp with the first update boolean found_create = false; for (Property prop : props) { if (prop instanceof TimeStamp) { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { found_create = true; + break; } } } if (!found_create) { - TimeStamp t = new TimeStamp(System.currentTimeMillis(), - "creation"); + TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation"); props.add(t); } @@ -499,10 +527,8 @@ CommandProvider { // for now. // The DB only contains ISL ports if (isISLink(e)) { - this.nodeConnectorsDB.put(e.getHeadNodeConnector(), - new HashSet()); - this.nodeConnectorsDB.put(e.getTailNodeConnector(), - new HashSet()); + this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet(1)); + this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet(1)); } log.trace("Edge {} {}", e.toString(), type.name()); break; @@ -522,29 +548,29 @@ CommandProvider { log.trace("Edge {} {}", e.toString(), type.name()); break; case CHANGED: - Set old_props = this.edgesDB.get(e); + Set oldProps = this.edgesDB.get(e); // When property changes lets make sure we can change it // all except the creation time stamp because that should // be changed only when the edge is destroyed and created // again - TimeStamp tc = null; - for (Property prop : old_props) { + TimeStamp timeStamp = null; + for (Property prop : oldProps) { if (prop instanceof TimeStamp) { - TimeStamp t = (TimeStamp) prop; - if (t.getTimeStampName().equals("creation")) { - tc = t; + TimeStamp tsProp = (TimeStamp) prop; + if (tsProp.getTimeStampName().equals("creation")) { + timeStamp = tsProp; + break; } } } // Now lets make sure new properties are non-null - // Make sure the props are non-null if (props == null) { - props = (Set) new HashSet(); + props = new HashSet(); } else { // Copy the set so noone is going to change the content - props = (Set) new HashSet(props); + props = new HashSet(props); } // Now lets remove the creation property if exist in the @@ -555,13 +581,14 @@ CommandProvider { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { i.remove(); + break; } } } // Now lets add the creation timestamp in it - if (tc != null) { - props.add(tc); + if (timeStamp != null) { + props.add(timeStamp); } // Finally update @@ -588,7 +615,7 @@ CommandProvider { try { s.edgeUpdate(teuList); } catch (Exception exc) { - log.error("Exception on callback", exc); + log.error("Exception on edge update:", exc); } } @@ -602,79 +629,78 @@ CommandProvider { private Edge getLinkTuple(TopologyUserLinkConfig link) { - Edge linkTuple = null; NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector()); NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector()); - if (srcNodeConnector == null || dstNodeConnector == null) return null; try { - linkTuple = new Edge(srcNodeConnector, dstNodeConnector); - } catch (Exception e) { + return new Edge(srcNodeConnector, dstNodeConnector); + } catch (Exception e) { + return null; } - return linkTuple; } @Override public ConcurrentMap getUserLinks() { - return userLinks; + return new ConcurrentHashMap(userLinksDB); } @Override - public Status addUserLink(TopologyUserLinkConfig link) { - if (!link.isValid()) { + public Status addUserLink(TopologyUserLinkConfig userLink) { + if (!userLink.isValid()) { return new Status(StatusCode.BADREQUEST, - "Configuration Invalid. Please check the parameters"); + "User link configuration invalid."); } - if (userLinks.get(link.getName()) != null) { - return new Status(StatusCode.CONFLICT, "Link with name : " - + link.getName() - + " already exists. Please use another name"); + userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); + + //Check if this link already configured + //NOTE: infinispan cache doesn't support Map.containsValue() + // (which is linear time in most ConcurrentMap impl anyway) + for (TopologyUserLinkConfig existingLink : userLinksDB.values()) { + if (existingLink.equals(userLink)) { + return new Status(StatusCode.CONFLICT, "Link configuration exists"); + } } - if (userLinks.containsValue(link)) { - return new Status(StatusCode.CONFLICT, "Link configuration exists"); + //attempt put, if mapping for this key already existed return conflict + if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) { + return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName() + + " already exists. Please use another name"); } - link.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); - userLinks.put(link.getName(), link); - - Edge linkTuple = getLinkTuple(link); + Edge linkTuple = getLinkTuple(userLink); if (linkTuple != null) { if (!isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); } - linkTuple = getReverseLinkTuple(link); + linkTuple = getReverseLinkTuple(userLink); if (linkTuple != null) { - link.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); + userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); if (!isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); } } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } @Override public Status deleteUserLink(String linkName) { if (linkName == null) { - return new Status(StatusCode.BADREQUEST, - "A valid linkName is required to Delete a link"); + return new Status(StatusCode.BADREQUEST, "User link name cannot be null."); } - TopologyUserLinkConfig link = userLinks.get(linkName); - - Edge linkTuple = getLinkTuple(link); - userLinks.remove(linkName); - if (linkTuple != null) { - if (!isProductionLink(linkTuple)) { + TopologyUserLinkConfig link = userLinksDB.remove(linkName); + Edge linkTuple; + if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) { + if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } linkTuple = getReverseLinkTuple(link); - if ((linkTuple != null) && !isProductionLink(linkTuple)) { + if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } private void registerWithOSGIConsole() { @@ -696,8 +722,8 @@ CommandProvider { } public void _printUserLink(CommandInterpreter ci) { - for (String name : this.userLinks.keySet()) { - TopologyUserLinkConfig linkConfig = userLinks.get(name); + for (String name : this.userLinksDB.keySet()) { + TopologyUserLinkConfig linkConfig = userLinksDB.get(name); ci.println("Name : " + name); ci.println(linkConfig); ci.println("Edge " + getLinkTuple(linkConfig)); @@ -771,7 +797,6 @@ CommandProvider { @Override public Object readObject(ObjectInputStream ois) throws FileNotFoundException, IOException, ClassNotFoundException { - // TODO Auto-generated method stub return ois.readObject(); } @@ -790,4 +815,92 @@ CommandProvider { log.warn("Link Utilization back to normal: {}", edge); } + private void edgeUpdateClusterWide(Edge e, UpdateType type, Set props, boolean isLocal) { + TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type); + upd.setLocal(isLocal); + notifyQ.add(upd); + } + + @Override + public void entryCreated(final Object key, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + // This is the case of an Edge being added to the topology DB + final Edge e = (Edge) key; + log.trace("Edge {} CREATED isLocal:{}", e, originLocal); + edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal); + } + } + + @Override + public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + final Edge e = (Edge) key; + log.trace("Edge {} CHANGED isLocal:{}", e, originLocal); + final Set props = (Set) new_value; + edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal); + } + } + + @Override + public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) { + if (cacheName.equals(TOPOEDGESDB)) { + final Edge e = (Edge) key; + log.trace("Edge {} DELETED isLocal:{}", e, originLocal); + edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal); + } + } + + class TopologyNotify implements Runnable { + private final BlockingQueue notifyQ; + private TopoEdgeUpdate entry; + private List teuList = new ArrayList(); + private boolean notifyListeners; + + TopologyNotify(BlockingQueue notifyQ) { + this.notifyQ = notifyQ; + } + + @Override + public void run() { + while (true) { + try { + log.trace("New run of TopologyNotify"); + notifyListeners = false; + // First we block waiting for an element to get in + entry = notifyQ.take(); + // Then we drain the whole queue if elements are + // in it without getting into any blocking condition + for (; entry != null; entry = notifyQ.poll()) { + teuList.add(entry); + notifyListeners = true; + } + + // Notify listeners only if there were updates drained else + // give up + if (notifyListeners) { + log.trace("Notifier thread, notified a listener"); + // Now update the listeners + for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) { + try { + s.edgeUpdate(teuList); + } catch (Exception exc) { + log.error("Exception on edge update:", exc); + } + } + } + teuList.clear(); + + // Lets sleep for sometime to allow aggregation of event + Thread.sleep(100); + } catch (InterruptedException e1) { + log.warn("TopologyNotify interrupted {}", e1.getMessage()); + if (shuttingDown) { + return; + } + } catch (Exception e2) { + log.error("", e2); + } + } + } + } }