private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
- /*
- * Create an executor pool to create the distributionOrder, this is a stop
- * gap solution caused by an issue with non-transactional caches in the
- * implementation we use, being currently worked on. It has been noticed in
- * fact that when non-transactional caches are being used sometime the key
- * are no distributed to all the nodes properly. To workaround the issue
- * transactional caches are being used, but there was a reason for using
- * non-transactional caches to start with, in fact we needed to be able in
- * the context of a northbound transaction to program the FRM entries
- * irrespective of the fact that transaction would commit or no else we
- * would not be able to achieve the entry programming and implement the
- * scheme for recovery from network element failures. Bottom line, now in
- * order to make sure an update on a transactional cache goes out while in a
- * transaction that need to be initiated by a different thread.
- */
- private ExecutorService executor;
-
- class DistributeOrderCallable implements Callable<Future<Status>> {
- private FlowEntryInstall e;
- private FlowEntryInstall u;
- private UpdateType t;
- DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
- this.e = e;
- this.u = u;
- this.t = t;
- }
-
- @Override
- public Future<Status> call() throws Exception {
- if (e == null || t == null) {
- logsync.error("Unexpected null Entry up update type");
- return null;
- }
- // Create the work order and distribute it
- FlowEntryDistributionOrder fe =
- new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
- // First create the monitor job
- FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
- logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
- workMonitor.put(fe, ret);
- if (t.equals(UpdateType.CHANGED)) {
- // Then distribute the work
- workOrder.put(fe, u);
- } else {
- // Then distribute the work
- workOrder.put(fe, e);
- }
- logsync.trace("WorkOrder requested");
- // Now create an Handle to monitor the execution of the operation
- return ret;
- }
- }
-
/**
* @param e
* Entry being installed/updated/removed
Node n = e.getNode();
if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
- Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
- if (worker != null) {
- Future<Future<Status>> workerRes = this.executor.submit(worker);
- try {
- return workerRes.get();
- } catch (InterruptedException e1) {
- // we where interrupted, not a big deal.
- return null;
- } catch (ExecutionException e1) {
- logsync.error(
- "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
- e);
- return null;
- }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", n, fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
}
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
}
logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
* pratice to have in it's context operations that can take time,
* hence moving off to a different thread for async processing.
*/
+ private ExecutorService executor;
@Override
public void modeChangeNotify(final Node node, final boolean proactive) {
Callable<Status> modeChangeCallable = new Callable<Status>() {
Map<Node, Set<Edge>> nodeEdges = topologyManager.getNodeEdges();
Map<Node, Set<NodeConnector>> hostEdges = topologyManager
.getNodesWithNodeConnectorHost();
+ int hostEdgesHashCode = getHostHashCode(hostEdges, topologyManager);
List<Switch> nodes = switchManager.getNetworkDevices();
List<SwitchConfig> switchConfigurations = new ArrayList<SwitchConfig>();
// return cache if topology hasn't changed
if (
(metaNodeHash.get(containerName) != null && metaHostHash.get(containerName) != null && metaNodeSingleHash.get(containerName) != null && metaNodeConfigurationHash.get(containerName) != null) &&
- metaNodeHash.get(containerName).equals(nodeEdges.hashCode()) && metaHostHash.get(containerName).equals(hostEdges.hashCode()) && metaNodeSingleHash.get(containerName).equals(nodes.hashCode()) && metaNodeConfigurationHash.get(containerName).equals(switchConfigurations.hashCode())
+ metaNodeHash.get(containerName).equals(nodeEdges.hashCode()) && metaHostHash.get(containerName).equals(hostEdgesHashCode) && metaNodeSingleHash.get(containerName).equals(nodes.hashCode()) && metaNodeConfigurationHash.get(containerName).equals(switchConfigurations.hashCode())
) {
return metaCache.get(containerName).values();
}
// cache has changed, we must assign the new values
metaNodeHash.put(containerName, nodeEdges.hashCode());
- metaHostHash.put(containerName, hostEdges.hashCode());
+ metaHostHash.put(containerName, hostEdgesHashCode);
metaNodeSingleHash.put(containerName, nodes.hashCode());
metaNodeConfigurationHash.put(containerName, switchConfigurations.hashCode());
}
}
+ /**
+ * Calculate the total host hashcode
+ *
+ * This is to handle cases where there are multiple hosts per NodeConnector
+ *
+ * @param hostEdges
+ * - hostEdges data structure
+ * @param topology
+ * - topology bundle
+ * @return this topology's host hashcode
+ */
+ private int getHostHashCode(Map<Node, Set<NodeConnector>> hostEdges, ITopologyManager topology) {
+ List<Host> hosts = new ArrayList<Host>();
+ for (Set<NodeConnector> nodeConnectors : hostEdges.values()) {
+ for (NodeConnector nodeConnector : nodeConnectors) {
+ List<Host> theseHosts = topology.getHostsAttachedToNodeConnector(nodeConnector);
+ hosts.addAll(theseHosts);
+ }
+ }
+
+ return hosts.hashCode();
+ }
+
/**
* Add regular hosts to main topology
*