* topology database and notifies all the listeners of topology changes.
*/
public class TopologyManagerImpl implements
- ICacheUpdateAware,
+ ICacheUpdateAware<Object, Object>,
ITopologyManager,
IConfigurationContainerAware,
IListenTopoUpdates,
notifyThread = new Thread(new TopologyNotify(notifyQ));
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void allocateCaches() {
- try {
this.edgesDB =
- (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
- }
+ (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- try {
this.hostsDB =
- (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
- TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
- }
+ (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- try {
this.nodeConnectorsDB =
- (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+ (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
- }
-
- try {
this.userLinksDB =
- (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+ (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
+ }
+
+ private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
+ ConcurrentMap<?, ?> cache = null;
+ try {
+ cache = this.clusterContainerService.createCache(cacheName, cacheModes);
+ } catch (CacheExistException e) {
+ log.debug(cacheName + " cache already exists - destroy and recreate if needed");
+ } catch (CacheConfigException e) {
+ log.error(cacheName + " cache configuration invalid - check cache mode");
}
+ return cache;
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
if (this.clusterContainerService == null) {
log.error("Cluster Services is null, can't retrieve caches.");
}
}
- private boolean nodeConnectorsExist(Edge e) {
+ private boolean headNodeConnectorExist(Edge e) {
+ /*
+ * Only check the head end point which is supposed to be part of a
+ * network node we control (present in our inventory). If we checked the
+ * tail end point as well, we would not store the edges that connect to
+ * a non sdn enable port on a non sdn capable production switch. We want
+ * to be able to see these switches on the topology.
+ */
NodeConnector head = e.getHeadNodeConnector();
- NodeConnector tail = e.getTailNodeConnector();
- return (switchManager.doesNodeConnectorExist(head) &&
- switchManager.doesNodeConnectorExist(tail));
+ return (switchManager.doesNodeConnectorExist(head));
}
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
switch (type) {
case ADDED:
- // Ensure that both tail and head node connectors exist.
- if (!nodeConnectorsExist(e)) {
+ // Avoid redundant update as notifications trigger expensive tasks
+ if (edgesDB.containsKey(e)) {
+ log.trace("Skipping redundant edge addition: {}", e);
+ return null;
+ }
+
+ // Ensure that head node connector exists
+ if (!headNodeConnectorExist(e)) {
log.warn("Ignore edge that contains invalid node connector: {}", e);
return null;
}
public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
if (cacheName.equals(TOPOEDGESDB)) {
final Edge e = (Edge) key;
- log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+ log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
final Set<Property> props = (Set<Property>) new_value;
edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
}