}
}
- boolean proactiveForwarding = false;
+ boolean forwardingModeChanged = false;
+
// copy node properties from config
if (nodeConfigList != null) {
String nodeId = node.toString();
propMap.putAll(nodeProperties);
if (nodeProperties.get(ForwardingMode.name) != null) {
ForwardingMode mode = (ForwardingMode) nodeProperties.get(ForwardingMode.name);
- proactiveForwarding = mode.isProactive();
+ forwardingModeChanged = mode.isProactive();
}
}
}
Property defaultMode = new ForwardingMode(ForwardingMode.REACTIVE_FORWARDING);
propMap.put(ForwardingMode.name, defaultMode);
}
- boolean result = false;
- if (propMapCurr == null) {
- if (nodeProps.putIfAbsent(node, propMap) == null) {
- result = true;
- }
+
+ boolean propsAdded = false;
+ // Attempt initial add
+ if (nodeProps.putIfAbsent(node, propMap) == null) {
+ propsAdded = true;
+
+ /* Notify listeners only for initial node addition
+ * to avoid expensive tasks triggered by redundant notifications
+ */
+ notifyNode(node, UpdateType.ADDED, propMap);
} else {
- result = nodeProps.replace(node, propMapCurr, propMap);
+
+ propsAdded = nodeProps.replace(node, propMapCurr, propMap);
+
+ // check whether forwarding mode changed
+ if (propMapCurr.get(ForwardingMode.name) != null) {
+ ForwardingMode mode = (ForwardingMode) propMapCurr.get(ForwardingMode.name);
+ forwardingModeChanged ^= mode.isProactive();
+ }
}
- if (!result) {
- log.debug("Cluster conflict: Conflict while adding the node properties. Node: {} Properties: {}",
- node.getID(), props);
+ if (!propsAdded) {
+ log.debug("Cluster conflict while adding node {}. Overwriting with latest props: {}", node.getID(), props);
addNodeProps(node, propMap);
}
- // check if span ports are configed
+ // check if span ports are configured
addSpanPorts(node);
-
- // notify node listeners
- notifyNode(node, UpdateType.ADDED, propMap);
-
// notify proactive mode forwarding
- if (proactiveForwarding) {
+ if (forwardingModeChanged) {
notifyModeChange(node, true);
}
}
if (nodeProps == null) {
return;
}
- nodeProps.remove(node);
+
+ if (nodeProps.remove(node) == null) {
+ log.debug("Received redundant node REMOVED udate for {}. Skipping..", node);
+ return;
+ }
+
nodeConnectorNames.remove(node);
Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProps.entrySet()) {
switch (type) {
case ADDED:
+ // Skip redundant ADDED update (e.g. cluster switch-over)
+ if (nodeConnectorProps.containsKey(nodeConnector)) {
+ log.debug("Redundant nodeconnector ADDED for {}, props {} for container {}",
+ nodeConnector, props, containerName);
+ update = false;
+ }
+
if (props != null) {
for (Property prop : props) {
addNodeConnectorProp(nodeConnector, prop);
addNodeConnectorProp(nodeConnector, null);
}
+
addSpanPort(nodeConnector);
break;
case CHANGED:
// only add if span is configured on this nodeConnector
for (SpanConfig conf : getSpanConfigList(nodeConnector.getNode())) {
if (conf.getPortArrayList().contains(nodeConnector)) {
- List<NodeConnector> ncLists = new ArrayList<NodeConnector>();
- ncLists.add(nodeConnector);
- addSpanPorts(nodeConnector.getNode(), ncLists);
+ List<NodeConnector> ncList = new ArrayList<NodeConnector>();
+ ncList.add(nodeConnector);
+ addSpanPorts(nodeConnector.getNode(), ncList);
return;
}
}