private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
- /*
- * Create an executor pool to create the distributionOrder, this is a stop
- * gap solution caused by an issue with non-transactional caches in the
- * implementation we use, being currently worked on. It has been noticed in
- * fact that when non-transactional caches are being used sometime the key
- * are no distributed to all the nodes properly. To workaround the issue
- * transactional caches are being used, but there was a reason for using
- * non-transactional caches to start with, in fact we needed to be able in
- * the context of a northbound transaction to program the FRM entries
- * irrespective of the fact that transaction would commit or no else we
- * would not be able to achieve the entry programming and implement the
- * scheme for recovery from network element failures. Bottom line, now in
- * order to make sure an update on a transactional cache goes out while in a
- * transaction that need to be initiated by a different thread.
- */
- private ExecutorService executor;
-
- class DistributeOrderCallable implements Callable<Future<Status>> {
- private FlowEntryInstall e;
- private FlowEntryInstall u;
- private UpdateType t;
- DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
- this.e = e;
- this.u = u;
- this.t = t;
- }
-
- @Override
- public Future<Status> call() throws Exception {
- if (e == null || t == null) {
- logsync.error("Unexpected null Entry up update type");
- return null;
- }
- // Create the work order and distribute it
- FlowEntryDistributionOrder fe =
- new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
- // First create the monitor job
- FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
- logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
- workMonitor.put(fe, ret);
- if (t.equals(UpdateType.CHANGED)) {
- // Then distribute the work
- workOrder.put(fe, u);
- } else {
- // Then distribute the work
- workOrder.put(fe, e);
- }
- logsync.trace("WorkOrder requested");
- // Now create an Handle to monitor the execution of the operation
- return ret;
- }
- }
-
/**
* @param e
* Entry being installed/updated/removed
Node n = e.getNode();
if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
- Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
- if (worker != null) {
- Future<Future<Status>> workerRes = this.executor.submit(worker);
- try {
- return workerRes.get();
- } catch (InterruptedException e1) {
- // we where interrupted, not a big deal.
- return null;
- } catch (ExecutionException e1) {
- logsync.error(
- "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
- e);
- return null;
- }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", n, fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
}
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
}
logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
* pratice to have in it's context operations that can take time,
* hence moving off to a different thread for async processing.
*/
+ private ExecutorService executor;
@Override
public void modeChangeNotify(final Node node, final boolean proactive) {
Callable<Status> modeChangeCallable = new Callable<Status>() {