X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManager.java;h=d08b24d0783f796f2bda87da71fa34833d37411f;hp=0d0874990e88a5902e510a62ca31aa1294c3d67d;hb=cb6fd7676c31ccae1f13077a48bc2e36e67089b4;hpb=4b863003a7ba2da0df528f806471262d0c800a09 diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index 0d0874990e..d08b24d078 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -22,9 +22,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -198,6 +201,59 @@ public class ForwardingRulesManager implements private ConcurrentMap workMonitor = new ConcurrentHashMap(); + /* + * Create an executor pool to create the distributionOrder, this is a stop + * gap solution caused by an issue with non-transactional caches in the + * implementation we use, being currently worked on. It has been noticed in + * fact that when non-transactional caches are being used sometime the key + * are no distributed to all the nodes properly. To workaround the issue + * transactional caches are being used, but there was a reason for using + * non-transactional caches to start with, in fact we needed to be able in + * the context of a northbound transaction to program the FRM entries + * irrespective of the fact that transaction would commit or no else we + * would not be able to achieve the entry programming and implement the + * scheme for recovery from network element failures. Bottom line, now in + * order to make sure an update on a transactional cache goes out while in a + * transaction that need to be initiated by a different thread. + */ + private ExecutorService executor; + + class DistributeOrderCallable implements Callable> { + private FlowEntryInstall e; + private FlowEntryInstall u; + private UpdateType t; + DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) { + this.e = e; + this.u = u; + this.t = t; + } + + @Override + public Future call() throws Exception { + if (e == null || t == null) { + logsync.error("Unexpected null Entry up update type"); + return null; + } + // Create the work order and distribute it + FlowEntryDistributionOrder fe = + new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress()); + // First create the monitor job + FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe); + logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe); + workMonitor.put(fe, ret); + if (t.equals(UpdateType.CHANGED)) { + // Then distribute the work + workOrder.put(fe, u); + } else { + // Then distribute the work + workOrder.put(fe, e); + } + logsync.trace("WorkOrder requested"); + // Now create an Handle to monitor the execution of the operation + return ret; + } + } + /** * @param e * Entry being installed/updated/removed @@ -218,27 +274,24 @@ public class ForwardingRulesManager implements Node n = e.getNode(); if (!connectionManager.isLocal(n)) { - // Create the work order and distribute it - FlowEntryDistributionOrder fe = - new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress()); - // First create the monitor job - FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe); - logsync.trace("Node {} not local so sending fe {}", n, fe); - workMonitor.put(fe, ret); - if (t.equals(UpdateType.CHANGED)) { - // Then distribute the work - workOrder.put(fe, u); - } else { - // Then distribute the work - workOrder.put(fe, e); + Callable> worker = new DistributeOrderCallable(e, u, t); + if (worker != null) { + Future> workerRes = this.executor.submit(worker); + try { + return workerRes.get(); + } catch (InterruptedException e1) { + // we where interrupted, not a big deal. + return null; + } catch (ExecutionException e1) { + logsync.error( + "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for", + e); + return null; + } } - logsync.trace("WorkOrder requested"); - // Now create an Handle to monitor the execution of the operation - return ret; } logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t); - return null; } @@ -1353,10 +1406,10 @@ public class ForwardingRulesManager implements EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache(WORKSTATUSCACHE, - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache(WORKORDERCACHE, - EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); } catch (CacheConfigException cce) { log.error("CacheConfigException"); @@ -1849,27 +1902,38 @@ public class ForwardingRulesManager implements /** * Uninstall all the non-internal Flow Entries present in the software view. - * A copy of each entry is stored in the inactive list so that it can be - * re-applied when needed. This function is called on the global instance of - * FRM only, when the first container is created + * If requested, a copy of each original flow entry will be stored in the + * inactive list so that it can be re-applied when needed (This is typically + * the case when running in the default container and controller moved to + * container mode) + * + * @param preserveFlowEntries + * if true, a copy of each original entry is stored in the + * inactive list */ - private void uninstallAllFlowEntries() { + private void uninstallAllFlowEntries(boolean preserveFlowEntries) { log.info("Uninstalling all non-internal flows"); + List toRemove = new ArrayList(); + // Store entries / create target list for (ConcurrentMap.Entry mapEntry : installedSwView.entrySet()) { FlowEntryInstall flowEntries = mapEntry.getValue(); // Skip internal generated static flows if (!flowEntries.isInternal()) { - inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal()); + toRemove.add(flowEntries); + // Store the original entries if requested + if (preserveFlowEntries) { + inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal()); + } } } // Now remove the entries - for (FlowEntry flowEntry : inactiveFlows.keySet()) { - Status status = this.removeEntry(flowEntry, false); + for (FlowEntryInstall flowEntryHw : toRemove) { + Status status = this.removeEntryInternal(flowEntryHw, false); if (!status.isSuccess()) { - log.warn("Failed to remove entry: {}. The failure is: {}", flowEntry, status.getDescription()); + log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription()); } } } @@ -2453,6 +2517,9 @@ public class ForwardingRulesManager implements // Initialize graceful stop flag stopping = false; + // Allocate the executor service + this.executor = Executors.newSingleThreadExecutor(); + // Start event handler thread frmEventHandler.start(); @@ -2472,7 +2539,9 @@ public class ForwardingRulesManager implements */ void stop() { stopping = true; - uninstallAllFlowEntries(); + uninstallAllFlowEntries(false); + // Shutdown executor + this.executor.shutdownNow(); } public void setFlowProgrammerService(IFlowProgrammerService service) { @@ -2588,8 +2657,15 @@ public class ForwardingRulesManager implements } switch (update) { case ADDED: + /* + * Controller is moving to container mode. We are in the default + * container context, we need to remove all our non-internal flows + * to prevent any container isolation breakage. We also need to + * preserve our flow so that they can be re-installed if we move + * back to non container mode (no containers). + */ this.inContainerMode = true; - this.uninstallAllFlowEntries(); + this.uninstallAllFlowEntries(true); break; case REMOVED: this.inContainerMode = false;