Removing the Container Aware dependency from Clustering Services.
[controller.git] / opendaylight / forwardingrulesmanager / implementation / src / main / java / org / opendaylight / controller / forwardingrulesmanager / internal / ForwardingRulesManager.java
index d08b24d0783f796f2bda87da71fa34833d37411f..fc18079b0afb4537bb7a38a5da3d01fe3736b07d 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.connectionmanager.ConnectionLocality;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
@@ -201,59 +202,6 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
             new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
 
-    /*
-     * Create an executor pool to create the distributionOrder, this is a stop
-     * gap solution caused by an issue with non-transactional caches in the
-     * implementation we use, being currently worked on. It has been noticed in
-     * fact that when non-transactional caches are being used sometime the key
-     * are no distributed to all the nodes properly. To workaround the issue
-     * transactional caches are being used, but there was a reason for using
-     * non-transactional caches to start with, in fact we needed to be able in
-     * the context of a northbound transaction to program the FRM entries
-     * irrespective of the fact that transaction would commit or no else we
-     * would not be able to achieve the entry programming and implement the
-     * scheme for recovery from network element failures. Bottom line, now in
-     * order to make sure an update on a transactional cache goes out while in a
-     * transaction that need to be initiated by a different thread.
-     */
-    private ExecutorService executor;
-
-    class DistributeOrderCallable implements Callable<Future<Status>> {
-        private FlowEntryInstall e;
-        private FlowEntryInstall u;
-        private UpdateType t;
-        DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
-            this.e = e;
-            this.u = u;
-            this.t = t;
-        }
-
-        @Override
-        public Future<Status> call() throws Exception {
-            if (e == null || t == null) {
-                logsync.error("Unexpected null Entry up update type");
-                return null;
-            }
-            // Create the work order and distribute it
-            FlowEntryDistributionOrder fe =
-                    new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
-            // First create the monitor job
-            FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
-            logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
-            workMonitor.put(fe, ret);
-            if (t.equals(UpdateType.CHANGED)) {
-                // Then distribute the work
-                workOrder.put(fe, u);
-            } else {
-                // Then distribute the work
-                workOrder.put(fe, e);
-            }
-            logsync.trace("WorkOrder requested");
-            // Now create an Handle to monitor the execution of the operation
-            return ret;
-        }
-    }
-
     /**
      * @param e
      *            Entry being installed/updated/removed
@@ -273,25 +221,27 @@ public class ForwardingRulesManager implements
         }
 
         Node n = e.getNode();
-        if (!connectionManager.isLocal(n)) {
-            Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
-            if (worker != null) {
-                Future<Future<Status>> workerRes = this.executor.submit(worker);
-                try {
-                    return workerRes.get();
-                } catch (InterruptedException e1) {
-                    // we where interrupted, not a big deal.
-                    return null;
-                } catch (ExecutionException e1) {
-                    logsync.error(
-                            "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
-                            e);
-                    return null;
-                }
+        if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
+            // Create the work order and distribute it
+            FlowEntryDistributionOrder fe =
+                    new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+            // First create the monitor job
+            FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+            logsync.trace("Node {} not local so sending fe {}", n, fe);
+            workMonitor.put(fe, ret);
+            if (t.equals(UpdateType.CHANGED)) {
+                // Then distribute the work
+                workOrder.put(fe, u);
+            } else {
+                // Then distribute the work
+                workOrder.put(fe, e);
             }
+            logsync.trace("WorkOrder requested");
+            // Now create an Handle to monitor the execution of the operation
+            return ret;
         }
 
-        logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t);
+        logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
         return null;
     }
 
@@ -1406,10 +1356,10 @@ public class ForwardingRulesManager implements
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
 
         } catch (CacheConfigException cce) {
             log.error("CacheConfigException");
@@ -1867,9 +1817,13 @@ public class ForwardingRulesManager implements
             }
         }
         if (target != null) {
-            // Program the network node
-            Status status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this
-                    .installFlowEntry(target.getFlowEntry());
+            Status status = target.validate(container);
+            if (!status.isSuccess()) {
+                log.warn(status.getDescription());
+                return status;
+            }
+            status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this
+                                    .installFlowEntry(target.getFlowEntry());
             if (status.isSuccess()) {
                 // Update Configuration database
                 target.setStatus(SUCCESS);
@@ -2096,51 +2050,79 @@ public class ForwardingRulesManager implements
         addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name
     }
 
+    /**
+     * (non-Javadoc)
+     *
+     * @see org.opendaylight.controller.switchmanager.ISwitchManagerAware#modeChangeNotify(org.opendaylight.controller.sal.core.Node,
+     *      boolean)
+     *
+     *      This method can be called from within the OSGi framework context,
+     *      given the programming operation can take sometime, it not good
+     *      pratice to have in it's context operations that can take time,
+     *      hence moving off to a different thread for async processing.
+     */
+    private ExecutorService executor;
     @Override
-    public void modeChangeNotify(Node node, boolean proactive) {
-        List<FlowConfig> defaultConfigs = new ArrayList<FlowConfig>();
-
-        List<String> puntAction = new ArrayList<String>();
-        puntAction.add(ActionType.CONTROLLER.toString());
-
-        FlowConfig allowARP = new FlowConfig();
-        allowARP.setInstallInHw(true);
-        allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND);
-        allowARP.setPriority("1");
-        allowARP.setNode(node);
-        allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase());
-        allowARP.setActions(puntAction);
-        defaultConfigs.add(allowARP);
-
-        FlowConfig allowLLDP = new FlowConfig();
-        allowLLDP.setInstallInHw(true);
-        allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND);
-        allowLLDP.setPriority("1");
-        allowLLDP.setNode(node);
-        allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()).toUpperCase());
-        allowLLDP.setActions(puntAction);
-        defaultConfigs.add(allowLLDP);
-
-        List<String> dropAction = new ArrayList<String>();
-        dropAction.add(ActionType.DROP.toString());
-
-        FlowConfig dropAllConfig = new FlowConfig();
-        dropAllConfig.setInstallInHw(true);
-        dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + FlowConfig.INTERNALSTATICFLOWEND);
-        dropAllConfig.setPriority("0");
-        dropAllConfig.setNode(node);
-        dropAllConfig.setActions(dropAction);
-        defaultConfigs.add(dropAllConfig);
-
-        log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
-        for (FlowConfig fc : defaultConfigs) {
-            Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
-            if (status.isSuccess()) {
-                log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
-            } else {
-                log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), fc.getName());
+    public void modeChangeNotify(final Node node, final boolean proactive) {
+        Callable<Status> modeChangeCallable = new Callable<Status>() {
+            @Override
+            public Status call() throws Exception {
+                List<FlowConfig> defaultConfigs = new ArrayList<FlowConfig>();
+
+                List<String> puntAction = new ArrayList<String>();
+                puntAction.add(ActionType.CONTROLLER.toString());
+
+                FlowConfig allowARP = new FlowConfig();
+                allowARP.setInstallInHw(true);
+                allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND);
+                allowARP.setPriority("1");
+                allowARP.setNode(node);
+                allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue())
+                        .toUpperCase());
+                allowARP.setActions(puntAction);
+                defaultConfigs.add(allowARP);
+
+                FlowConfig allowLLDP = new FlowConfig();
+                allowLLDP.setInstallInHw(true);
+                allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND);
+                allowLLDP.setPriority("1");
+                allowLLDP.setNode(node);
+                allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue())
+                        .toUpperCase());
+                allowLLDP.setActions(puntAction);
+                defaultConfigs.add(allowLLDP);
+
+                List<String> dropAction = new ArrayList<String>();
+                dropAction.add(ActionType.DROP.toString());
+
+                FlowConfig dropAllConfig = new FlowConfig();
+                dropAllConfig.setInstallInHw(true);
+                dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop"
+                        + FlowConfig.INTERNALSTATICFLOWEND);
+                dropAllConfig.setPriority("0");
+                dropAllConfig.setNode(node);
+                dropAllConfig.setActions(dropAction);
+                defaultConfigs.add(dropAllConfig);
+
+                log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
+                for (FlowConfig fc : defaultConfigs) {
+                    Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
+                    if (status.isSuccess()) {
+                        log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
+                    } else {
+                        log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"),
+                                fc.getName());
+                    }
+                }
+                return new Status(StatusCode.SUCCESS);
             }
-        }
+        };
+
+        /*
+         * Execute the work outside the caller context, this could be an
+         * expensive operation and we don't want to block the caller for it.
+         */
+        this.executor.submit(modeChangeCallable);
     }
 
     /**
@@ -2480,6 +2462,14 @@ public class ForwardingRulesManager implements
                             } else {
                                 log.warn("Not expected null WorkStatus", work);
                             }
+                        }  else if (event instanceof ContainerFlowChangeEvent) {
+                            /*
+                             * Whether it is an addition or removal, we have to
+                             * recompute the merged flows entries taking into
+                             * account all the current container flows because
+                             * flow merging is not an injective function
+                             */
+                            updateFlowsContainerFlow();
                         } else {
                             log.warn("Dequeued unknown event {}", event.getClass()
                                     .getSimpleName());
@@ -2578,12 +2568,8 @@ public class ForwardingRulesManager implements
         }
         log.trace("Container {}: Updating installed flows because of container flow change: {} {}",
                 container.getName(), t, current);
-        /*
-         * Whether it is an addition or removal, we have to recompute the merged
-         * flows entries taking into account all the current container flows
-         * because flow merging is not an injective function
-         */
-        updateFlowsContainerFlow();
+        ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t);
+        pendingEvents.offer(ev);
     }
 
     @Override
@@ -2751,6 +2737,30 @@ public class ForwardingRulesManager implements
             return newEntry;
         }
     }
+    private class ContainerFlowChangeEvent extends FRMEvent {
+        private final ContainerFlow previous;
+        private final ContainerFlow current;
+        private final UpdateType type;
+
+        public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) {
+            this.previous = previous;
+            this.current = current;
+            this.type = type;
+        }
+
+        public ContainerFlow getPrevious() {
+            return this.previous;
+        }
+
+        public ContainerFlow getCurrent() {
+            return this.current;
+        }
+
+        public UpdateType getType() {
+            return this.type;
+        }
+    }
+
 
     private class WorkStatusCleanup extends FRMEvent {
         private FlowEntryDistributionOrder fe;
@@ -3078,7 +3088,7 @@ public class ForwardingRulesManager implements
                 return;
             }
             Node n = fei.getNode();
-            if (connectionManager.isLocal(n)) {
+            if (connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
                 logsync.trace("workOrder for fe {} processed locally", fe);
                 // I'm the controller in charge for the request, queue it for
                 // processing