private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
- /*
- * Create an executor pool to create the distributionOrder, this is a stop
- * gap solution caused by an issue with non-transactional caches in the
- * implementation we use, being currently worked on. It has been noticed in
- * fact that when non-transactional caches are being used sometime the key
- * are no distributed to all the nodes properly. To workaround the issue
- * transactional caches are being used, but there was a reason for using
- * non-transactional caches to start with, in fact we needed to be able in
- * the context of a northbound transaction to program the FRM entries
- * irrespective of the fact that transaction would commit or no else we
- * would not be able to achieve the entry programming and implement the
- * scheme for recovery from network element failures. Bottom line, now in
- * order to make sure an update on a transactional cache goes out while in a
- * transaction that need to be initiated by a different thread.
- */
- private ExecutorService executor;
-
- class DistributeOrderCallable implements Callable<Future<Status>> {
- private FlowEntryInstall e;
- private FlowEntryInstall u;
- private UpdateType t;
- DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
- this.e = e;
- this.u = u;
- this.t = t;
- }
-
- @Override
- public Future<Status> call() throws Exception {
- if (e == null || t == null) {
- logsync.error("Unexpected null Entry up update type");
- return null;
- }
- // Create the work order and distribute it
- FlowEntryDistributionOrder fe =
- new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
- // First create the monitor job
- FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
- logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
- workMonitor.put(fe, ret);
- if (t.equals(UpdateType.CHANGED)) {
- // Then distribute the work
- workOrder.put(fe, u);
- } else {
- // Then distribute the work
- workOrder.put(fe, e);
- }
- logsync.trace("WorkOrder requested");
- // Now create an Handle to monitor the execution of the operation
- return ret;
- }
- }
-
/**
* @param e
* Entry being installed/updated/removed
Node n = e.getNode();
if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
- Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
- if (worker != null) {
- Future<Future<Status>> workerRes = this.executor.submit(worker);
- try {
- return workerRes.get();
- } catch (InterruptedException e1) {
- // we where interrupted, not a big deal.
- return null;
- } catch (ExecutionException e1) {
- logsync.error(
- "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
- e);
- return null;
- }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", n, fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
}
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
}
logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
* pratice to have in it's context operations that can take time,
* hence moving off to a different thread for async processing.
*/
+ private ExecutorService executor;
@Override
public void modeChangeNotify(final Node node, final boolean proactive) {
Callable<Status> modeChangeCallable = new Callable<Status>() {
} else {
log.warn("Not expected null WorkStatus", work);
}
+ } else if (event instanceof ContainerFlowChangeEvent) {
+ /*
+ * Whether it is an addition or removal, we have to
+ * recompute the merged flows entries taking into
+ * account all the current container flows because
+ * flow merging is not an injective function
+ */
+ updateFlowsContainerFlow();
} else {
log.warn("Dequeued unknown event {}", event.getClass()
.getSimpleName());
}
log.trace("Container {}: Updating installed flows because of container flow change: {} {}",
container.getName(), t, current);
- /*
- * Whether it is an addition or removal, we have to recompute the merged
- * flows entries taking into account all the current container flows
- * because flow merging is not an injective function
- */
- updateFlowsContainerFlow();
+ ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t);
+ pendingEvents.offer(ev);
}
@Override
return newEntry;
}
}
+ private class ContainerFlowChangeEvent extends FRMEvent {
+ private final ContainerFlow previous;
+ private final ContainerFlow current;
+ private final UpdateType type;
+
+ public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) {
+ this.previous = previous;
+ this.current = current;
+ this.type = type;
+ }
+
+ public ContainerFlow getPrevious() {
+ return this.previous;
+ }
+
+ public ContainerFlow getCurrent() {
+ return this.current;
+ }
+
+ public UpdateType getType() {
+ return this.type;
+ }
+ }
+
private class WorkStatusCleanup extends FRMEvent {
private FlowEntryDistributionOrder fe;