Merge "Move init and destroy empty impl from Activator classes. Have only one empty...
[controller.git] / opendaylight / forwardingrulesmanager / implementation / src / main / java / org / opendaylight / controller / forwardingrulesmanager / internal / ForwardingRulesManager.java
index 58d23655cac5a17353989c5cfd36d71a7a14ea74..e9a56d6113c1502447d5956b5bd6236e185ac26b 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.connectionmanager.ConnectionLocality;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
@@ -56,6 +57,7 @@ import org.opendaylight.controller.sal.action.Controller;
 import org.opendaylight.controller.sal.action.Flood;
 import org.opendaylight.controller.sal.action.Output;
 import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainer;
 import org.opendaylight.controller.sal.core.IContainerListener;
@@ -101,7 +103,7 @@ public class ForwardingRulesManager implements
         IConfigurationContainerAware,
         IInventoryListener,
         IObjectReader,
-        ICacheUpdateAware,
+        ICacheUpdateAware<Object,Object>,
         CommandProvider,
         IFlowProgrammerListener {
     private static final String NODEDOWN = "Node is Down";
@@ -201,59 +203,6 @@ public class ForwardingRulesManager implements
     private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
             new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
 
-    /*
-     * Create an executor pool to create the distributionOrder, this is a stop
-     * gap solution caused by an issue with non-transactional caches in the
-     * implementation we use, being currently worked on. It has been noticed in
-     * fact that when non-transactional caches are being used sometime the key
-     * are no distributed to all the nodes properly. To workaround the issue
-     * transactional caches are being used, but there was a reason for using
-     * non-transactional caches to start with, in fact we needed to be able in
-     * the context of a northbound transaction to program the FRM entries
-     * irrespective of the fact that transaction would commit or no else we
-     * would not be able to achieve the entry programming and implement the
-     * scheme for recovery from network element failures. Bottom line, now in
-     * order to make sure an update on a transactional cache goes out while in a
-     * transaction that need to be initiated by a different thread.
-     */
-    private ExecutorService executor;
-
-    class DistributeOrderCallable implements Callable<Future<Status>> {
-        private FlowEntryInstall e;
-        private FlowEntryInstall u;
-        private UpdateType t;
-        DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
-            this.e = e;
-            this.u = u;
-            this.t = t;
-        }
-
-        @Override
-        public Future<Status> call() throws Exception {
-            if (e == null || t == null) {
-                logsync.error("Unexpected null Entry up update type");
-                return null;
-            }
-            // Create the work order and distribute it
-            FlowEntryDistributionOrder fe =
-                    new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
-            // First create the monitor job
-            FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
-            logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
-            workMonitor.put(fe, ret);
-            if (t.equals(UpdateType.CHANGED)) {
-                // Then distribute the work
-                workOrder.put(fe, u);
-            } else {
-                // Then distribute the work
-                workOrder.put(fe, e);
-            }
-            logsync.trace("WorkOrder requested");
-            // Now create an Handle to monitor the execution of the operation
-            return ret;
-        }
-    }
-
     /**
      * @param e
      *            Entry being installed/updated/removed
@@ -273,25 +222,27 @@ public class ForwardingRulesManager implements
         }
 
         Node n = e.getNode();
-        if (!connectionManager.isLocal(n)) {
-            Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
-            if (worker != null) {
-                Future<Future<Status>> workerRes = this.executor.submit(worker);
-                try {
-                    return workerRes.get();
-                } catch (InterruptedException e1) {
-                    // we where interrupted, not a big deal.
-                    return null;
-                } catch (ExecutionException e1) {
-                    logsync.error(
-                            "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
-                            e);
-                    return null;
-                }
+        if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) {
+            // Create the work order and distribute it
+            FlowEntryDistributionOrder fe =
+                    new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+            // First create the monitor job
+            FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+            logsync.trace("Node {} not local so sending fe {}", n, fe);
+            workMonitor.put(fe, ret);
+            if (t.equals(UpdateType.CHANGED)) {
+                // Then distribute the work
+                workOrder.put(fe, u);
+            } else {
+                // Then distribute the work
+                workOrder.put(fe, e);
             }
+            logsync.trace("WorkOrder requested");
+            // Now create an Handle to monitor the execution of the operation
+            return ret;
         }
 
-        logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t);
+        logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t);
         return null;
     }
 
@@ -1362,7 +1313,6 @@ public class ForwardingRulesManager implements
         retrieveCaches();
     }
 
-    @SuppressWarnings("deprecation")
     private void allocateCaches() {
         if (this.clusterContainerService == null) {
             log.warn("Un-initialized clusterContainerService, can't create cache");
@@ -1406,10 +1356,10 @@ public class ForwardingRulesManager implements
                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
 
             clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
 
         } catch (CacheConfigException cce) {
             log.error("CacheConfigException");
@@ -1418,7 +1368,7 @@ public class ForwardingRulesManager implements
         }
     }
 
-    @SuppressWarnings({ "unchecked", "deprecation" })
+    @SuppressWarnings({ "unchecked" })
     private void retrieveCaches() {
         ConcurrentMap<?, ?> map;
 
@@ -1867,9 +1817,13 @@ public class ForwardingRulesManager implements
             }
         }
         if (target != null) {
-            // Program the network node
-            Status status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this
-                    .installFlowEntry(target.getFlowEntry());
+            Status status = target.validate(container);
+            if (!status.isSuccess()) {
+                log.warn(status.getDescription());
+                return status;
+            }
+            status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this
+                                    .installFlowEntry(target.getFlowEntry());
             if (status.isSuccess()) {
                 // Update Configuration database
                 target.setStatus(SUCCESS);
@@ -2107,6 +2061,7 @@ public class ForwardingRulesManager implements
      *      pratice to have in it's context operations that can take time,
      *      hence moving off to a different thread for async processing.
      */
+    private ExecutorService executor;
     @Override
     public void modeChangeNotify(final Node node, final boolean proactive) {
         Callable<Status> modeChangeCallable = new Callable<Status>() {
@@ -2219,7 +2174,99 @@ public class ForwardingRulesManager implements
 
     @Override
     public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map<String, Property> propMap) {
+        boolean updateStaticFlowCluster = false;
+
+        switch (type) {
+        case ADDED:
+            break;
+        case CHANGED:
+            Config config = (propMap == null) ? null : (Config) propMap.get(Config.ConfigPropName);
+            if (config != null) {
+                switch (config.getValue()) {
+                case Config.ADMIN_DOWN:
+                    log.trace("Port {} is administratively down: uninstalling interested flows", nodeConnector);
+                    updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector);
+                    break;
+                case Config.ADMIN_UP:
+                    log.trace("Port {} is administratively up: installing interested flows", nodeConnector);
+                    updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nodeConnector);
+                    break;
+                case Config.ADMIN_UNDEF:
+                    break;
+                default:
+                }
+            }
+            break;
+        case REMOVED:
+            // This is the case where a switch port is removed from the SDN agent space
+            log.trace("Port {} was removed from our control: uninstalling interested flows", nodeConnector);
+            updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector);
+            break;
+        default:
+
+        }
+
+        if (updateStaticFlowCluster) {
+            refreshClusterStaticFlowsStatus(nodeConnector.getNode());
+        }
+    }
+
+    /*
+     * It goes through the static flows configuration, it identifies the ones
+     * which have the specified node connector as input or output port and
+     * install them on the network node if they are marked to be installed in
+     * hardware and their status shows they were not installed yet
+     */
+    private boolean installFlowsOnNodeConnectorUp(NodeConnector nodeConnector) {
+        boolean updated = false;
+        List<FlowConfig> flowConfigForNode = getStaticFlows(nodeConnector.getNode());
+        for (FlowConfig flowConfig : flowConfigForNode) {
+            if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) {
+                if (flowConfig.installInHw() && !flowConfig.getStatus().equals(SUCCESS)) {
+                    Status status = this.installFlowEntry(flowConfig.getFlowEntry());
+                    if (!status.isSuccess()) {
+                        flowConfig.setStatus(status.getDescription());
+                    } else {
+                        flowConfig.setStatus(SUCCESS);
+                    }
+                    updated = true;
+                }
+            }
+        }
+        return updated;
+    }
 
+    /*
+     * Remove from the network node all the flows which have the specified node
+     * connector as input or output port. If any of the flow entry is a static
+     * flow, it updates the correspondent configuration.
+     */
+    private boolean removeFlowsOnNodeConnectorDown(NodeConnector nodeConnector) {
+        boolean updated = false;
+        List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nodeConnector.getNode());
+        if (nodeFlowEntries == null) {
+            return updated;
+        }
+        for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
+            if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nodeConnector)) {
+                Status status = this.removeEntryInternal(fei, true);
+                if (!status.isSuccess()) {
+                    continue;
+                }
+                /*
+                 * If the flow entry is a static flow, then update its
+                 * configuration
+                 */
+                if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
+                    FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
+                    if (flowConfig != null) {
+                        flowConfig.setStatus(PORTREMOVED);
+                        updated = true;
+                    }
+                }
+            }
+        }
+        return updated;
     }
 
     private FlowConfig getDerivedFlowConfig(FlowConfig original, String configName, Short port) {
@@ -2507,6 +2554,14 @@ public class ForwardingRulesManager implements
                             } else {
                                 log.warn("Not expected null WorkStatus", work);
                             }
+                        }  else if (event instanceof ContainerFlowChangeEvent) {
+                            /*
+                             * Whether it is an addition or removal, we have to
+                             * recompute the merged flows entries taking into
+                             * account all the current container flows because
+                             * flow merging is not an injective function
+                             */
+                            updateFlowsContainerFlow();
                         } else {
                             log.warn("Dequeued unknown event {}", event.getClass()
                                     .getSimpleName());
@@ -2605,12 +2660,8 @@ public class ForwardingRulesManager implements
         }
         log.trace("Container {}: Updating installed flows because of container flow change: {} {}",
                 container.getName(), t, current);
-        /*
-         * Whether it is an addition or removal, we have to recompute the merged
-         * flows entries taking into account all the current container flows
-         * because flow merging is not an injective function
-         */
-        updateFlowsContainerFlow();
+        ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t);
+        pendingEvents.offer(ev);
     }
 
     @Override
@@ -2623,57 +2674,22 @@ public class ForwardingRulesManager implements
 
         switch (t) {
         case REMOVED:
-
-            List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nc.getNode());
-            if (nodeFlowEntries == null) {
-                return;
-            }
-            for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
-                if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nc)) {
-                    Status status = this.removeEntryInternal(fei, true);
-                    if (!status.isSuccess()) {
-                        continue;
-                    }
-                    /*
-                     * If the flow entry is a static flow, then update its
-                     * configuration
-                     */
-                    if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
-                        FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
-                        if (flowConfig != null) {
-                            flowConfig.setStatus(PORTREMOVED);
-                            updateStaticFlowCluster = true;
-                        }
-                    }
-                }
-            }
-            if (updateStaticFlowCluster) {
-                refreshClusterStaticFlowsStatus(nc.getNode());
-            }
+            log.trace("Port {} was removed from container: uninstalling interested flows", nc);
+            updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nc);
             break;
         case ADDED:
-            List<FlowConfig> flowConfigForNode = getStaticFlows(nc.getNode());
-            for (FlowConfig flowConfig : flowConfigForNode) {
-                if (doesFlowContainNodeConnector(flowConfig.getFlow(), nc)) {
-                    if (flowConfig.installInHw()) {
-                        Status status = this.installFlowEntry(flowConfig.getFlowEntry());
-                        if (!status.isSuccess()) {
-                            flowConfig.setStatus(status.getDescription());
-                        } else {
-                            flowConfig.setStatus(SUCCESS);
-                        }
-                        updateStaticFlowCluster = true;
-                    }
-                }
-            }
-            if (updateStaticFlowCluster) {
-                refreshClusterStaticFlowsStatus(nc.getNode());
-            }
+            log.trace("Port {} was added to container: reinstall interested flows", nc);
+            updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nc);
+
             break;
         case CHANGED:
             break;
         default:
         }
+
+        if (updateStaticFlowCluster) {
+            refreshClusterStaticFlowsStatus(nc.getNode());
+        }
     }
 
     @Override
@@ -2778,6 +2794,30 @@ public class ForwardingRulesManager implements
             return newEntry;
         }
     }
+    private class ContainerFlowChangeEvent extends FRMEvent {
+        private final ContainerFlow previous;
+        private final ContainerFlow current;
+        private final UpdateType type;
+
+        public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) {
+            this.previous = previous;
+            this.current = current;
+            this.type = type;
+        }
+
+        public ContainerFlow getPrevious() {
+            return this.previous;
+        }
+
+        public ContainerFlow getCurrent() {
+            return this.current;
+        }
+
+        public UpdateType getType() {
+            return this.type;
+        }
+    }
+
 
     private class WorkStatusCleanup extends FRMEvent {
         private FlowEntryDistributionOrder fe;
@@ -3105,7 +3145,7 @@ public class ForwardingRulesManager implements
                 return;
             }
             Node n = fei.getNode();
-            if (connectionManager.isLocal(n)) {
+            if (connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
                 logsync.trace("workOrder for fe {} processed locally", fe);
                 // I'm the controller in charge for the request, queue it for
                 // processing