import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+ /*
+ * Create an executor pool to create the distributionOrder, this is a stop
+ * gap solution caused by an issue with non-transactional caches in the
+ * implementation we use, being currently worked on. It has been noticed in
+ * fact that when non-transactional caches are being used sometime the key
+ * are no distributed to all the nodes properly. To workaround the issue
+ * transactional caches are being used, but there was a reason for using
+ * non-transactional caches to start with, in fact we needed to be able in
+ * the context of a northbound transaction to program the FRM entries
+ * irrespective of the fact that transaction would commit or no else we
+ * would not be able to achieve the entry programming and implement the
+ * scheme for recovery from network element failures. Bottom line, now in
+ * order to make sure an update on a transactional cache goes out while in a
+ * transaction that need to be initiated by a different thread.
+ */
+ private ExecutorService executor;
+
+ class DistributeOrderCallable implements Callable<Future<Status>> {
+ private FlowEntryInstall e;
+ private FlowEntryInstall u;
+ private UpdateType t;
+ DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ this.e = e;
+ this.u = u;
+ this.t = t;
+ }
+
+ @Override
+ public Future<Status> call() throws Exception {
+ if (e == null || t == null) {
+ logsync.error("Unexpected null Entry up update type");
+ return null;
+ }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
+ }
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
+ }
+ }
+
/**
* @param e
* Entry being installed/updated/removed
Node n = e.getNode();
if (!connectionManager.isLocal(n)) {
- // Create the work order and distribute it
- FlowEntryDistributionOrder fe =
- new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
- // First create the monitor job
- FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
- logsync.trace("Node {} not local so sending fe {}", n, fe);
- workMonitor.put(fe, ret);
- if (t.equals(UpdateType.CHANGED)) {
- // Then distribute the work
- workOrder.put(fe, u);
- } else {
- // Then distribute the work
- workOrder.put(fe, e);
+ Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
+ if (worker != null) {
+ Future<Future<Status>> workerRes = this.executor.submit(worker);
+ try {
+ return workerRes.get();
+ } catch (InterruptedException e1) {
+ // we where interrupted, not a big deal.
+ return null;
+ } catch (ExecutionException e1) {
+ logsync.error(
+ "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
+ e);
+ return null;
+ }
}
- logsync.trace("WorkOrder requested");
- // Now create an Handle to monitor the execution of the operation
- return ret;
}
logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t);
-
return null;
}
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
/**
* Uninstall all the non-internal Flow Entries present in the software view.
- * A copy of each entry is stored in the inactive list so that it can be
- * re-applied when needed. This function is called on the global instance of
- * FRM only, when the first container is created
+ * If requested, a copy of each original flow entry will be stored in the
+ * inactive list so that it can be re-applied when needed (This is typically
+ * the case when running in the default container and controller moved to
+ * container mode)
+ *
+ * @param preserveFlowEntries
+ * if true, a copy of each original entry is stored in the
+ * inactive list
*/
- private void uninstallAllFlowEntries() {
+ private void uninstallAllFlowEntries(boolean preserveFlowEntries) {
log.info("Uninstalling all non-internal flows");
+ List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>();
+
// Store entries / create target list
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> mapEntry : installedSwView.entrySet()) {
FlowEntryInstall flowEntries = mapEntry.getValue();
// Skip internal generated static flows
if (!flowEntries.isInternal()) {
- inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal());
+ toRemove.add(flowEntries);
+ // Store the original entries if requested
+ if (preserveFlowEntries) {
+ inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal());
+ }
}
}
// Now remove the entries
- for (FlowEntry flowEntry : inactiveFlows.keySet()) {
- Status status = this.removeEntry(flowEntry, false);
+ for (FlowEntryInstall flowEntryHw : toRemove) {
+ Status status = this.removeEntryInternal(flowEntryHw, false);
if (!status.isSuccess()) {
- log.warn("Failed to remove entry: {}. The failure is: {}", flowEntry, status.getDescription());
+ log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
}
}
}
if (staticFlows.isEmpty()) {
loadFlowConfiguration();
}
+
+ // Allocate the executor service
+ this.executor = Executors.newSingleThreadExecutor();
}
/**
*/
void stop() {
stopping = true;
- uninstallAllFlowEntries();
+ uninstallAllFlowEntries(false);
+ // Shutdown executor
+ this.executor.shutdownNow();
}
public void setFlowProgrammerService(IFlowProgrammerService service) {
}
switch (update) {
case ADDED:
+ /*
+ * Controller is moving to container mode. We are in the default
+ * container context, we need to remove all our non-internal flows
+ * to prevent any container isolation breakage. We also need to
+ * preserve our flow so that they can be re-installed if we move
+ * back to non container mode (no containers).
+ */
this.inContainerMode = true;
- this.uninstallAllFlowEntries();
+ this.uninstallAllFlowEntries(true);
break;
case REMOVED:
this.inContainerMode = false;