From 5d061e05111778938c9d8e0e05f09a8c99835594 Mon Sep 17 00:00:00 2001 From: Yevgeny Khodorkovsky Date: Wed, 17 Jul 2013 11:12:10 -0700 Subject: [PATCH] HA - Cache synch for Topology Manager - Allocate/retrieve cluster cache for topology mgr caches - Remove destroyCache from bundle shutdown callbacks in HostTracker, TopologyMgr and StaticRouting - Style fixes Change-Id: I352f5cb4cfab8fd06b15be708cf07ae1ef64c9bf Signed-off-by: Yevgeny Khodorkovsky --- .../arphandler/internal/ArpHandler.java | 10 +- .../internal/StaticRoutingImplementation.java | 19 +- .../hosttracker/internal/HostTracker.java | 12 - .../controller/sal/utils/ObjectWriter.java | 2 +- .../TopologyUserLinkConfig.java | 10 +- .../topologymanager/internal/Activator.java | 13 +- .../internal/TopologyManagerImpl.java | 416 +++++++++--------- 7 files changed, 233 insertions(+), 249 deletions(-) diff --git a/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java b/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java index 6e625a0eb7..811c7aca83 100644 --- a/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java +++ b/opendaylight/arphandler/src/main/java/org/opendaylight/controller/arphandler/internal/ArpHandler.java @@ -21,9 +21,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.hosttracker.IfHostListener; import org.opendaylight.controller.hosttracker.IfIptoHost; @@ -47,6 +45,8 @@ import org.opendaylight.controller.sal.utils.NetUtils; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.switchmanager.Subnet; import org.opendaylight.controller.topologymanager.ITopologyManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ArpHandler implements IHostFinder, IListenDataPacket { private static final Logger logger = LoggerFactory @@ -57,8 +57,8 @@ public class ArpHandler implements IHostFinder, IListenDataPacket { private IDataPacketService dataPacketService = null; private Set hostListener = Collections .synchronizedSet(new HashSet()); - private ConcurrentHashMap> arpRequestors; - private ConcurrentHashMap countDownTimers; + private ConcurrentMap> arpRequestors; + private ConcurrentMap countDownTimers; private Timer periodicTimer; void setHostListener(IfHostListener s) { diff --git a/opendaylight/forwarding/staticrouting/src/main/java/org/opendaylight/controller/forwarding/staticrouting/internal/StaticRoutingImplementation.java b/opendaylight/forwarding/staticrouting/src/main/java/org/opendaylight/controller/forwarding/staticrouting/internal/StaticRoutingImplementation.java index 8819be3988..4afd4fb8e4 100644 --- a/opendaylight/forwarding/staticrouting/src/main/java/org/opendaylight/controller/forwarding/staticrouting/internal/StaticRoutingImplementation.java +++ b/opendaylight/forwarding/staticrouting/src/main/java/org/opendaylight/controller/forwarding/staticrouting/internal/StaticRoutingImplementation.java @@ -44,12 +44,12 @@ import org.opendaylight.controller.forwarding.staticrouting.StaticRouteConfig; import org.opendaylight.controller.hosttracker.IfIptoHost; import org.opendaylight.controller.hosttracker.IfNewHostNotify; import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector; -import org.opendaylight.controller.sal.utils.StatusCode; import org.opendaylight.controller.sal.utils.GlobalConstants; import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.ObjectReader; import org.opendaylight.controller.sal.utils.ObjectWriter; import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -206,22 +206,6 @@ public class StaticRoutingImplementation implements IfNewHostNotify, } } - @SuppressWarnings("deprecation") - private void destroyCaches() { - if (this.clusterContainerService == null) { - log - .info("un-initialized clusterContainerService, can't destroy cache"); - return; - } - - clusterContainerService.destroyCache("forwarding.staticrouting.routes"); - clusterContainerService - .destroyCache("forwarding.staticrouting.configs"); - clusterContainerService - .destroyCache("forwarding.staticrouting.configSaveEvent"); - - } - @Override public void entryCreated(Long key, String cacheName, boolean local) { } @@ -488,7 +472,6 @@ public class StaticRoutingImplementation implements IfNewHostNotify, log.debug("Destroy all the Static Routing Rules given we are " + "shutting down"); - destroyCaches(); gatewayProbeTimer.cancel(); // Clear the listener so to be ready in next life diff --git a/opendaylight/hosttracker/implementation/src/main/java/org/opendaylight/controller/hosttracker/internal/HostTracker.java b/opendaylight/hosttracker/implementation/src/main/java/org/opendaylight/controller/hosttracker/internal/HostTracker.java index 8468c5b284..6b7bac03e9 100644 --- a/opendaylight/hosttracker/implementation/src/main/java/org/opendaylight/controller/hosttracker/internal/HostTracker.java +++ b/opendaylight/hosttracker/implementation/src/main/java/org/opendaylight/controller/hosttracker/internal/HostTracker.java @@ -212,17 +212,6 @@ public class HostTracker implements IfIptoHost, IfHostListener, ISwitchManagerAw inactiveStaticHosts = new ConcurrentHashMap(); } - @SuppressWarnings("deprecation") - private void destroyCache() { - if (this.clusterContainerService == null) { - logger.error("un-initialized clusterMger, can't destroy cache"); - return; - } - logger.debug("Destroying Cache for HostTracker"); - this.clusterContainerService.destroyCache("hostTrackerAH"); - this.clusterContainerService.destroyCache("hostTrackerIH"); - nonClusterObjectCreate(); - } public void shutDown() { } @@ -1343,7 +1332,6 @@ public class HostTracker implements IfIptoHost, IfHostListener, ISwitchManagerAw * */ void destroy() { - destroyCache(); } /** diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/utils/ObjectWriter.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/utils/ObjectWriter.java index 730a9b0368..8e1dcd69f9 100644 --- a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/utils/ObjectWriter.java +++ b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/utils/ObjectWriter.java @@ -61,6 +61,6 @@ public class ObjectWriter { } } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } } diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/TopologyUserLinkConfig.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/TopologyUserLinkConfig.java index f485fdd0db..1b1e6f49d1 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/TopologyUserLinkConfig.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/TopologyUserLinkConfig.java @@ -10,19 +10,13 @@ package org.opendaylight.controller.topologymanager; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.opendaylight.controller.sal.core.Node; -import org.opendaylight.controller.sal.core.Node.NodeIDType; -import org.opendaylight.controller.sal.core.NodeConnector.NodeConnectorIDType; import org.opendaylight.controller.sal.core.NodeConnector; -import org.opendaylight.controller.sal.utils.GUIField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,7 +123,7 @@ public class TopologyUserLinkConfig implements Serializable { if (!isValidNodeConnector(srcNodeConnector) || !isValidNodeConnector(dstNodeConnector)) { - logger.warn("Invalid NodeConnector"); + logger.debug("Invalid NodeConnector in user link: {}", this); return false; } diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java index d0d9bfaa5e..0da1a2ee39 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java @@ -9,8 +9,12 @@ package org.opendaylight.controller.topologymanager.internal; -import org.apache.felix.dm.Component; +import java.util.Dictionary; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Set; +import org.apache.felix.dm.Component; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase; @@ -74,9 +78,14 @@ public class Activator extends ComponentActivatorAbstractBase { public void configureInstance(Component c, Object imp, String containerName) { if (imp.equals(TopologyManagerImpl.class)) { // export the service needed to listen to topology updates + Dictionary> props = new Hashtable>(); + Set propSet = new HashSet(); + propSet.add("topologymanager.configSaveEvent"); + props.put("cachenames", propSet); + c.setInterface(new String[] { IListenTopoUpdates.class.getName(), ITopologyManager.class.getName(), - IConfigurationContainerAware.class.getName() }, null); + IConfigurationContainerAware.class.getName() }, props); c.add(createContainerServiceDependency(containerName).setService( ITopologyService.class).setCallbacks("setTopoService", diff --git a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index ce39fed5fc..a043cbe925 100644 --- a/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/topologymanager/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -12,7 +12,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; -import java.util.Collections; +import java.util.Date; import java.util.Dictionary; import java.util.EnumSet; import java.util.HashMap; @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.felix.dm.Component; @@ -65,31 +66,34 @@ import org.slf4j.LoggerFactory; public class TopologyManagerImpl implements ITopologyManager, IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, CommandProvider { - private static final Logger log = LoggerFactory - .getLogger(TopologyManagerImpl.class); - private ITopologyService topoService = null; - private IClusterContainerServices clusterContainerService = null; + private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); + private static final String SAVE = "Save"; + private ITopologyService topoService; + private IClusterContainerServices clusterContainerService; // DB of all the Edges with properties which constitute our topology - private ConcurrentMap> edgesDB = null; + private ConcurrentMap> edgesDB; // DB of all NodeConnector which are part of ISL Edges, meaning they // are connected to another NodeConnector on the other side of an ISL link. // NodeConnector of a Production Edge is not part of this DB. - private ConcurrentMap> nodeConnectorsDB = null; + private ConcurrentMap> nodeConnectorsDB; // DB of all the NodeConnectors with an Host attached to it - private ConcurrentMap>> hostsDB = null; + private ConcurrentMap>> hostsDB; // Topology Manager Aware listeners - private Set topologyManagerAware = Collections - .synchronizedSet(new HashSet()); + private Set topologyManagerAware = + new CopyOnWriteArraySet();; private static String ROOT = GlobalConstants.STARTUPHOME.toString(); - private String userLinksFileName = null; - private ConcurrentMap userLinks; + private String userLinksFileName; + private ConcurrentMap userLinksDB; + private ConcurrentMap configSaveEvent; + void nonClusterObjectCreate() { edgesDB = new ConcurrentHashMap>(); hostsDB = new ConcurrentHashMap>>(); - userLinks = new ConcurrentHashMap(); nodeConnectorsDB = new ConcurrentHashMap>(); + userLinksDB = new ConcurrentHashMap(); + configSaveEvent = new ConcurrentHashMap(); } void setTopologyManagerAware(ITopologyManagerAware s) { @@ -136,8 +140,12 @@ CommandProvider { * */ void init(Component c) { + + allocateCaches(); + retrieveCaches(); + String containerName = null; - Dictionary props = c.getServiceProperties(); + Dictionary props = c.getServiceProperties(); if (props != null) { containerName = (String) props.get("containerName"); } else { @@ -145,57 +153,103 @@ CommandProvider { containerName = "UNKNOWN"; } - if (this.clusterContainerService == null) { - log.error("Cluster Services is null, not expected!"); - return; - } + userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; + registerWithOSGIConsole(); + loadConfiguration(); + } - if (this.topoService == null) { - log.error("Topology Services is null, not expected!"); + @SuppressWarnings({ "unchecked", "deprecation" }) + private void allocateCaches(){ + if (this.clusterContainerService == null) { + nonClusterObjectCreate(); + log.error("Cluster Services unavailable, allocated non-cluster caches!"); return; } try { - this.edgesDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.edgesDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + this.edgesDB = (ConcurrentMap>) this.clusterContainerService.createCache( + "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.edgesDB Cache already exists - " - + "destroy and recreate if needed"); + log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.edgesDB Cache configuration invalid - " - + "check cache mode"); + log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode"); } try { this.hostsDB = (ConcurrentMap>>) this.clusterContainerService - .createCache("topologymanager.hostsDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.hostsDB Cache already exists - " - + "destroy and recreate if needed"); + log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.hostsDB Cache configuration invalid - " - + "check cache mode"); + log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode"); } try { this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService - .createCache("topologymanager.nodeConnectorDB", EnumSet - .of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheExistException cee) { - log.error("topologymanager.nodeConnectorDB Cache already exists" - + " - destroy and recreate if needed"); + log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { - log.error("topologymanager.nodeConnectorDB Cache configuration " - + "invalid - check cache mode"); + log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode"); } - userLinks = new ConcurrentHashMap(); + try { + this.userLinksDB = (ConcurrentMap) this.clusterContainerService + .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + } catch (CacheExistException cee) { + log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed"); + } catch (CacheConfigException cce) { + log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode"); + } + + try { + this.configSaveEvent = (ConcurrentMap) this.clusterContainerService + .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + } catch (CacheExistException cee) { + log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed"); + } catch (CacheConfigException cce) { + log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode"); + } + + } + + @SuppressWarnings({ "unchecked", "deprecation" }) + private void retrieveCaches() { + if (this.clusterContainerService == null) { + log.error("Cluster Services is null, can't retrieve caches."); + return; + } + + this.edgesDB = (ConcurrentMap>) this.clusterContainerService + .getCache("topologymanager.edgesDB"); + if (edgesDB == null) { + log.error("Failed to get cache for topologymanager.edgesDB"); + } + + this.hostsDB = (ConcurrentMap>>) this.clusterContainerService + .getCache("topologymanager.hostsDB"); + if (hostsDB == null) { + log.error("Failed to get cache for topologymanager.hostsDB"); + } + + this.nodeConnectorsDB = (ConcurrentMap>) this.clusterContainerService + .getCache("topologymanager.nodeConnectorDB"); + if (nodeConnectorsDB == null) { + log.error("Failed to get cache for topologymanager.nodeConnectorDB"); + } + + this.userLinksDB = (ConcurrentMap) this.clusterContainerService + .getCache("topologymanager.userLinksDB"); + if (userLinksDB == null) { + log.error("Failed to get cache for topologymanager.userLinksDB"); + } + + this.configSaveEvent = (ConcurrentMap) this.clusterContainerService + .getCache("topologymanager.configSaveEvent"); + if (configSaveEvent == null) { + log.error("Failed to get cache for topologymanager.configSaveEvent"); + } - userLinksFileName = ROOT + "userTopology_" + containerName + ".conf"; - registerWithOSGIConsole(); - loadConfiguration(); } /** @@ -217,62 +271,38 @@ CommandProvider { * */ void destroy() { - if (this.clusterContainerService == null) { - log.error("Cluster Services is null, not expected!"); - this.edgesDB = null; - this.hostsDB = null; - this.nodeConnectorsDB = null; - return; - } - this.clusterContainerService.destroyCache("topologymanager.edgesDB"); - this.edgesDB = null; - this.clusterContainerService.destroyCache("topologymanager.hostsDB"); - this.hostsDB = null; - this.clusterContainerService - .destroyCache("topologymanager.nodeConnectorDB"); - this.nodeConnectorsDB = null; - log.debug("Topology Manager DB Deallocated"); } @SuppressWarnings("unchecked") private void loadConfiguration() { ObjectReader objReader = new ObjectReader(); - ConcurrentMap confList = (ConcurrentMap) objReader - .read(this, userLinksFileName); - - if (confList == null) { - return; - } + ConcurrentMap confList = + (ConcurrentMap) objReader.read(this, userLinksFileName); - for (TopologyUserLinkConfig conf : confList.values()) { - addUserLink(conf); + if (confList != null) { + for (TopologyUserLinkConfig conf : confList.values()) { + addUserLink(conf); + } } } @Override public Status saveConfig() { - // Publish the save config event to the cluster nodes - /** - * Get the CLUSTERING SERVICES WORKING BEFORE TRYING THIS - * - * configSaveEvent.put(new Date().getTime(), SAVE); - */ + // Publish the save config event to the cluster + configSaveEvent.put(new Date().getTime(), SAVE ); return saveConfigInternal(); } public Status saveConfigInternal() { - Status retS; ObjectWriter objWriter = new ObjectWriter(); - retS = objWriter - .write(new ConcurrentHashMap( - userLinks), userLinksFileName); + Status saveStatus = objWriter.write( + new ConcurrentHashMap(userLinksDB), userLinksFileName); - if (retS.isSuccess()) { - return retS; - } else { - return new Status(StatusCode.INTERNALERROR, "Save failed"); + if (! saveStatus.isSuccess()) { + return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription()); } + return saveStatus; } @Override @@ -281,31 +311,25 @@ CommandProvider { return null; } - HashMap> res = new HashMap>(); - for (Edge key : this.edgesDB.keySet()) { + Map> res = new HashMap>(); + for (Edge edge : this.edgesDB.keySet()) { // Lets analyze the tail - Node node = key.getTailNodeConnector().getNode(); + Node node = edge.getTailNodeConnector().getNode(); Set nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); + res.put(node, nodeEdges); } - nodeEdges.add(key); - // We need to re-add to the MAP even if the element was - // already there so in case of clustered services the map - // gets updated in the cluster - res.put(node, nodeEdges); + nodeEdges.add(edge); // Lets analyze the head - node = key.getHeadNodeConnector().getNode(); + node = edge.getHeadNodeConnector().getNode(); nodeEdges = res.get(node); if (nodeEdges == null) { nodeEdges = new HashSet(); + res.put(node, nodeEdges); } - nodeEdges.add(key); - // We need to re-add to the MAP even if the element was - // already there so in case of clustered services the map - // gets updated in the cluster - res.put(node, nodeEdges); + nodeEdges.add(edge); } return res; @@ -341,10 +365,8 @@ CommandProvider { * @return true if it is a production link */ public boolean isProductionLink(Edge e) { - return (e.getHeadNodeConnector().getType() - .equals(NodeConnector.NodeConnectorIDType.PRODUCTION) || e - .getTailNodeConnector().getType() - .equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); + return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION) + || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)); } /** @@ -360,31 +382,20 @@ CommandProvider { return null; } - HashMap> res = new HashMap>(); - for (Edge key : this.edgesDB.keySet()) { + Map> edgeMap = new HashMap>(); + Set props; + for (Map.Entry> edgeEntry : edgesDB.entrySet()) { // Sets of props are copied because the composition of // those properties could change with time - HashSet prop = new HashSet( - this.edgesDB.get(key)); + props = new HashSet(edgeEntry.getValue()); // We can simply reuse the key because the object is // immutable so doesn't really matter that we are // referencing the only owned by a different table, the // meaning is the same because doesn't change with time. - res.put(key, prop); + edgeMap.put(edgeEntry.getKey(), props); } - return res; - } - - // TODO remove with spring-dm removal - /** - * @param set - * the topologyAware to set - */ - public void setTopologyAware(Set set) { - for (Object s : set) { - setTopologyManagerAware((ITopologyManagerAware) s); - } + return edgeMap; } @Override @@ -393,7 +404,7 @@ CommandProvider { return null; } - return (this.hostsDB.keySet()); + return (new HashSet(this.hostsDB.keySet())); } @Override @@ -402,90 +413,92 @@ CommandProvider { return null; } HashMap> res = new HashMap>(); - - for (NodeConnector p : this.hostsDB.keySet()) { - Node n = p.getNode(); - Set pSet = res.get(n); - if (pSet == null) { + Node node; + Set portSet; + for (NodeConnector nc : this.hostsDB.keySet()) { + node = nc.getNode(); + portSet = res.get(node); + if (portSet == null) { // Create the HashSet if null - pSet = new HashSet(); - res.put(n, pSet); + portSet = new HashSet(); + res.put(node, portSet); } // Keep updating the HashSet, given this is not a // clustered map we can just update the set without // worrying to update the hashmap. - pSet.add(p); + portSet.add(nc); } return (res); } @Override - public Host getHostAttachedToNodeConnector(NodeConnector p) { - if (this.hostsDB == null) { + public Host getHostAttachedToNodeConnector(NodeConnector port) { + ImmutablePair> host; + if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) { return null; } - if (this.hostsDB.get(p) == null) - return null; - - return (this.hostsDB.get(p).getLeft()); + return host.getLeft(); } @Override - public void updateHostLink(NodeConnector p, Host h, UpdateType t, - Set props) { - if (this.hostsDB == null) { - return; + public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set props) { + + // Clone the property set in case non null else just + // create an empty one. Caches allocated via infinispan + // don't allow null values + if (props == null) { + props = new HashSet(); + } else { + props = new HashSet(props); } + ImmutablePair> thisHost = new ImmutablePair>(h, props); switch (t) { case ADDED: case CHANGED: - // Clone the property set in case non null else just - // create an empty one. Caches allocated via infinispan - // don't allow null values - if (props == null) { - props = new HashSet(); - } else { - props = new HashSet(props); - } - - this.hostsDB.put(p, new ImmutablePair(h, props)); + this.hostsDB.put(port, thisHost); break; case REMOVED: - this.hostsDB.remove(p); + //remove only if hasn't been concurrently modified + this.hostsDB.remove(port, thisHost); break; } } - private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, - Set props) { + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { switch (type) { case ADDED: // Make sure the props are non-null if (props == null) { - props = (Set) new HashSet(); + props = new HashSet(); } else { - // Copy the set so noone is going to change the content - props = (Set) new HashSet(props); + props = new HashSet(props); + } + + //in case of node switch-over to a different cluster controller, + //let's retain edge props + Set currentProps = this.edgesDB.get(e); + if (currentProps != null){ + props.addAll(currentProps); } // Now make sure there is the creation timestamp for the - // edge, if not there timestamp with the first update + // edge, if not there, stamp with the first update boolean found_create = false; for (Property prop : props) { if (prop instanceof TimeStamp) { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { found_create = true; + break; } } } if (!found_create) { - TimeStamp t = new TimeStamp(System.currentTimeMillis(), - "creation"); + TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation"); props.add(t); } @@ -498,10 +511,8 @@ CommandProvider { // for now. // The DB only contains ISL ports if (isISLink(e)) { - this.nodeConnectorsDB.put(e.getHeadNodeConnector(), - new HashSet()); - this.nodeConnectorsDB.put(e.getTailNodeConnector(), - new HashSet()); + this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet(1)); + this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet(1)); } log.trace("Edge {} {}", e.toString(), type.name()); break; @@ -521,29 +532,29 @@ CommandProvider { log.trace("Edge {} {}", e.toString(), type.name()); break; case CHANGED: - Set old_props = this.edgesDB.get(e); + Set oldProps = this.edgesDB.get(e); // When property changes lets make sure we can change it // all except the creation time stamp because that should // be changed only when the edge is destroyed and created // again - TimeStamp tc = null; - for (Property prop : old_props) { + TimeStamp timeStamp = null; + for (Property prop : oldProps) { if (prop instanceof TimeStamp) { - TimeStamp t = (TimeStamp) prop; - if (t.getTimeStampName().equals("creation")) { - tc = t; + TimeStamp tsProp = (TimeStamp) prop; + if (tsProp.getTimeStampName().equals("creation")) { + timeStamp = tsProp; + break; } } } // Now lets make sure new properties are non-null - // Make sure the props are non-null if (props == null) { - props = (Set) new HashSet(); + props = new HashSet(); } else { // Copy the set so noone is going to change the content - props = (Set) new HashSet(props); + props = new HashSet(props); } // Now lets remove the creation property if exist in the @@ -554,13 +565,14 @@ CommandProvider { TimeStamp t = (TimeStamp) prop; if (t.getTimeStampName().equals("creation")) { i.remove(); + break; } } } // Now lets add the creation timestamp in it - if (tc != null) { - props.add(tc); + if (timeStamp != null) { + props.add(timeStamp); } // Finally update @@ -587,7 +599,7 @@ CommandProvider { try { s.edgeUpdate(teuList); } catch (Exception exc) { - log.error("Exception on callback", exc); + log.error("Exception on edge update:", exc); } } @@ -601,79 +613,78 @@ CommandProvider { private Edge getLinkTuple(TopologyUserLinkConfig link) { - Edge linkTuple = null; NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector()); NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector()); - if (srcNodeConnector == null || dstNodeConnector == null) return null; try { - linkTuple = new Edge(srcNodeConnector, dstNodeConnector); + return new Edge(srcNodeConnector, dstNodeConnector); } catch (Exception e) { + return null; } - return linkTuple; } @Override public ConcurrentMap getUserLinks() { - return userLinks; + return new ConcurrentHashMap(userLinksDB); } @Override - public Status addUserLink(TopologyUserLinkConfig link) { - if (!link.isValid()) { + public Status addUserLink(TopologyUserLinkConfig userLink) { + if (!userLink.isValid()) { return new Status(StatusCode.BADREQUEST, - "Configuration Invalid. Please check the parameters"); + "User link configuration invalid."); } - if (userLinks.get(link.getName()) != null) { - return new Status(StatusCode.CONFLICT, "Link with name : " - + link.getName() - + " already exists. Please use another name"); + userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); + + //Check if this link already configured + //NOTE: infinispan cache doesn't support Map.containsValue() + // (which is linear time in most ConcurrentMap impl anyway) + for (TopologyUserLinkConfig existingLink : userLinksDB.values()) { + if (existingLink.equals(userLink)) { + return new Status(StatusCode.CONFLICT, "Link configuration exists"); + } } - if (userLinks.containsValue(link)) { - return new Status(StatusCode.CONFLICT, "Link configuration exists"); + //attempt put, if mapping for this key already existed return conflict + if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) { + return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName() + + " already exists. Please use another name"); } - link.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN); - userLinks.put(link.getName(), link); - - Edge linkTuple = getLinkTuple(link); + Edge linkTuple = getLinkTuple(userLink); if (linkTuple != null) { if (!isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); } - linkTuple = getReverseLinkTuple(link); + linkTuple = getReverseLinkTuple(userLink); if (linkTuple != null) { - link.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); + userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS); if (!isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet()); } } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } @Override public Status deleteUserLink(String linkName) { if (linkName == null) { - return new Status(StatusCode.BADREQUEST, - "A valid linkName is required to Delete a link"); + return new Status(StatusCode.BADREQUEST, "User link name cannot be null."); } - TopologyUserLinkConfig link = userLinks.get(linkName); - - Edge linkTuple = getLinkTuple(link); - userLinks.remove(linkName); - if (linkTuple != null) { - if (!isProductionLink(linkTuple)) { + TopologyUserLinkConfig link = userLinksDB.remove(linkName); + Edge linkTuple; + if (link != null && (linkTuple = getLinkTuple(link)) != null) { + if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } linkTuple = getReverseLinkTuple(link); - if ((linkTuple != null) && !isProductionLink(linkTuple)) { + if (! isProductionLink(linkTuple)) { edgeUpdate(linkTuple, UpdateType.REMOVED, null); } } - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } private void registerWithOSGIConsole() { @@ -695,8 +706,8 @@ CommandProvider { } public void _printUserLink(CommandInterpreter ci) { - for (String name : this.userLinks.keySet()) { - TopologyUserLinkConfig linkConfig = userLinks.get(name); + for (String name : this.userLinksDB.keySet()) { + TopologyUserLinkConfig linkConfig = userLinksDB.get(name); ci.println("Name : " + name); ci.println(linkConfig); ci.println("Edge " + getLinkTuple(linkConfig)); @@ -770,7 +781,6 @@ CommandProvider { @Override public Object readObject(ObjectInputStream ois) throws FileNotFoundException, IOException, ClassNotFoundException { - // TODO Auto-generated method stub return ois.readObject(); } -- 2.36.6