import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
* topology database and notifies all the listeners of topology changes.
*/
public class TopologyManagerImpl implements
- ICacheUpdateAware,
+ ICacheUpdateAware<Object, Object>,
ITopologyManager,
IConfigurationContainerAware,
IListenTopoUpdates,
if (this.topologyManagerAware != null) {
log.debug("Adding ITopologyManagerAware: {}", s);
this.topologyManagerAware.add(s);
- // Reply all the known edges
- if (this.edgesDB != null) {
- List<TopoEdgeUpdate> existingEdges = new ArrayList<TopoEdgeUpdate>();
- for (Entry<Edge, Set<Property>> entry : this.edgesDB.entrySet()) {
- existingEdges.add(new TopoEdgeUpdate(entry.getKey(), entry.getValue(), UpdateType.ADDED));
- }
- s.edgeUpdate(existingEdges);
- }
}
}
if (this.topologyManagerClusterWideAware != null) {
log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
this.topologyManagerClusterWideAware.add(s);
- // Reply all the known edges
- if (this.edgesDB != null) {
- List<TopoEdgeUpdate> existingEdges = new ArrayList<TopoEdgeUpdate>();
- for (Entry<Edge, Set<Property>> entry : this.edgesDB.entrySet()) {
- existingEdges.add(new TopoEdgeUpdate(entry.getKey(), entry.getValue(), UpdateType.ADDED));
- }
- s.edgeUpdate(existingEdges);
- }
}
}
notifyThread = new Thread(new TopologyNotify(notifyQ));
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void allocateCaches() {
try {
this.edgesDB =
}
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
if (this.clusterContainerService == null) {
log.error("Cluster Services is null, can't retrieve caches.");
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
switch (type) {
case ADDED:
+ // Avoid redundant update as notifications trigger expensive tasks
+ if (edgesDB.containsKey(e)) {
+ log.trace("Skipping redundant edge addition: {}", e);
+ return null;
+ }
// Ensure that both tail and head node connectors exist.
if (!nodeConnectorsExist(e)) {
log.warn("Ignore edge that contains invalid node connector: {}", e);
public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
if (cacheName.equals(TOPOEDGESDB)) {
final Edge e = (Edge) key;
- log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+ log.trace("Edge {} UPDATED isLocal:{}", e, originLocal);
final Set<Property> props = (Set<Property>) new_value;
edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
}