Fixes for MD-FRM 16/3116/12
authorEd Warnicke <eaw@cisco.com>
Wed, 27 Nov 2013 02:30:52 +0000 (18:30 -0800)
committerEd Warnicke <eaw@cisco.com>
Wed, 27 Nov 2013 12:07:25 +0000 (04:07 -0800)
Patch 1:  Fix MD-FRM to listen to the correct portion of the data tree.
Currently we listen to /flows/flow/ for which we would need to specify a
key to indicate which flow in the tree we wanted to listen to.  Changing to
listen to /flows/ and thus hear commits involving all flows.

Patch 2:  Unpack Entry... we need the value of the entry which is a DataObject.
Otherwise, the cast fails.

Patch 3: NodeFlow is only used as an input to rpcs towards the
openflowplugin, it does not get filed in the data tree.  Flow gets
filed in the data tree.  Addtionally, commits can come in with multiple
objects, so if we get a commit with other things, we either need to ignore
them or handle them... but blindly casting them will not work.

Patch 4: Change to cope with moving from NodeFlow to Flow.  Fortunately,
NodeFlow is just Flow plus a NodeRef plus a TableRef, so it pretty much just
worked (at least at the typing level... not to running yet).

Patch 5: Fixed an NPE by providing a Collections.<RpcError>emptySet() for
RpcErrors instead of null.

Patch 6: Fixed a failure due to attempt to check with the old AD-SAL
infra for the Node.  This will not work correctly.  Added TODO for
validation of the node.  Also cleaned up substantially handling of
error conditions while I was in the neighborhood.

Patch 7: Something in the current use of ClusteringServices for
validation is breaking things.  At first it looked like
we were just trying to allocate caches before we initialized
out clustering services variable, and thus failing... but at
the end of the day there is more wrong here than make sense
to debug at this time... so commented out.

Patch 8: Get the modifications from the transaction and
deal with the create, updated, removed.  Also handled the
cast to Flow.  Put a small null check around handling of
instructions in FRMUtil.

Patch 9: Something in the current use of ClusteringServices
is breaking things. Commenting out in order to get things working.

Patch 10: Correctly sorted out the creates from the removes
from the *true* updates.  Note, both creates and updates appear
as updates... so you have to do a little setwise subtraction.
Also note, the Collections you get back are immutable, so
you have to do that subtraction with care.

Victory!  As of Patch 10 I can confirm using
https://git.opendaylight.org/gerrit/#/c/3118/

That flows being added, removed, or updated are being
correctly wired to an RPC that is registered for them.

Change-Id: Iee020938d201f3df0629b2df13a85ad023c808ac
Signed-off-by: Ed Warnicke <eaw@cisco.com>
opendaylight/md-sal/forwardingrules-manager/pom.xml
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/forwardingrulesmanager/consumer/impl/FRMUtil.java
opendaylight/md-sal/forwardingrules-manager/src/main/java/org/opendaylight/controller/forwardingrulesmanager/consumer/impl/FlowConsumerImpl.java

index b5f5402..597483e 100644 (file)
       <artifactId>model-flow-management</artifactId>
       <version>1.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-binding-broker-impl</artifactId>
index 522b096..ae6ce2f 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.acti
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.SetTpSrcAction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.SetVlanIdAction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.SetVlanPcpAction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Instructions;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.instruction.ApplyActions;
@@ -57,7 +58,7 @@ public class FRMUtil {
 
     }
 
-    public static boolean validateMatch(NodeFlow flow) {
+    public static boolean validateMatch(Flow flow) {
         Match match = flow.getMatch();
         if (match != null) {
             EthernetMatch ethernetmatch = match.getEthernetMatch();
@@ -198,9 +199,12 @@ public class FRMUtil {
         return true;
     }
 
-    public static boolean validateInstructions(NodeFlow flow) {
+    public static boolean validateInstructions(Flow flow) {
         List<Instruction> instructionsList = new ArrayList<>();
         Instructions instructions = flow.getInstructions();
+        if( instructions == null ) {
+            return false;
+        }
         instructionsList = instructions.getInstruction();
 
         for (Instruction instruction : instructionsList) {
index 2ffe0ec..9d2a6a0 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -20,12 +22,7 @@ import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.Data
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.ServiceHelper;
-import org.opendaylight.controller.sal.utils.Status;
-import org.opendaylight.controller.sal.utils.StatusCode;
-import org.opendaylight.controller.switchmanager.ISwitchManager;
-import org.opendaylight.controller.switchmanager.Switch;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
@@ -54,6 +51,7 @@ import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,8 +84,7 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
     private boolean inContainerMode; // being used by global instance only
 
     public FlowConsumerImpl() {
-        InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).child(Flow.class)
-                .toInstance();
+        InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
 
         if (null == flowService) {
@@ -116,12 +113,11 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
         }
         // addFlowTest();
         System.out.println("-------------------------------------------------------------------");
-        allocateCaches();
         commitHandler = new FlowDataCommitHandler();
         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
                 IClusterContainerServices.class, this);
-        container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
+        allocateCaches();
         /*
          * If we are not the first cluster node to come up, do not initialize
          * the static flow entries ordinal
@@ -204,13 +200,20 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
 
         // updating the staticflow cache
+        /*
+         *  Commented out... as in many other places... use of ClusteringServices is breaking things
+         *  insufficient time to debug
         Integer ordinal = staticFlowsOrdinal.get(0);
         staticFlowsOrdinal.put(0, ++ordinal);
         staticFlows.put(ordinal, dataObject);
+        */
 
         // We send flow to the sounthbound plugin
         flowService.addFlow(input.build());
+        /*
+         * Commented out as this will also break due to improper use of ClusteringServices
         updateLocalDatabase((NodeFlow) dataObject, true);
+        */
     }
 
     /**
@@ -237,13 +240,19 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
 
         // updating the staticflow cache
+        /*
+         * Commented out due to problems caused by improper use of ClusteringServices
         Integer ordinal = staticFlowsOrdinal.get(0);
         staticFlowsOrdinal.put(0, ++ordinal);
         staticFlows.put(ordinal, dataObject);
+        */
 
         // We send flow to the sounthbound plugin
         flowService.removeFlow(input.build());
+        /*
+         * Commented out due to problems caused by improper use of ClusteringServices
         updateLocalDatabase((NodeFlow) dataObject, false);
+        */
     }
 
     /**
@@ -257,33 +266,60 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
         updatedflowbuilder.fieldsFrom(dataObject);
+        input.setNode(dataObject.getNode());
         input.setUpdatedFlow(updatedflowbuilder.build());
 
         // updating the staticflow cache
+        /*
+         * Commented out due to problems caused by improper use of ClusteringServices.
         Integer ordinal = staticFlowsOrdinal.get(0);
         staticFlowsOrdinal.put(0, ++ordinal);
         staticFlows.put(ordinal, dataObject);
+        */
 
         // We send flow to the sounthbound plugin
         flowService.updateFlow(input.build());
+        /*
+         * Commented out due to problems caused by improper use of ClusteringServices.
         updateLocalDatabase((NodeFlow) dataObject, true);
+        */
     }
 
     @SuppressWarnings("unchecked")
     private void commitToPlugin(internalTransaction transaction) {
-        for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
-            System.out.println("Coming add cc in FlowDatacommitHandler");
-            addFlow(entry.getKey(), entry.getValue());
+        Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification().getCreatedConfigurationData().entrySet();
+
+        /*
+         * This little dance is because updatedEntries contains both created and modified entries
+         * The reason I created a new HashSet is because the collections we are returned are immutable.
+         */
+        Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
+        updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
+        updatedEntries.removeAll(createdEntries);
+
+        Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification().getRemovedConfigurationData();
+        transaction.getModification().getOriginalConfigurationData();
+        for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
+            if(entry.getValue() instanceof Flow) {
+                System.out.println("Coming add cc in FlowDatacommitHandler");
+                addFlow(entry.getKey(), (Flow) entry.getValue());
+            }
         }
         for (@SuppressWarnings("unused")
-        Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
-            System.out.println("Coming update cc in FlowDatacommitHandler");
-            updateFlow(entry.getKey(), entry.getValue());
+        Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
+            if(entry.getValue() instanceof Flow) {
+                System.out.println("Coming update cc in FlowDatacommitHandler");
+                updateFlow(entry.getKey(), (Flow) entry.getValue());
+            }
         }
 
-        for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
-            System.out.println("Coming remove cc in FlowDatacommitHandler");
-            removeFlow(entry.getKey(), entry.getValue());
+        for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
+            DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
+            if(removeValue instanceof Flow) {
+                System.out.println("Coming remove cc in FlowDatacommitHandler");
+                removeFlow(instanceId, (Flow) removeValue);
+
+            }
         }
 
     }
@@ -329,39 +365,42 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
 
                 // validating the DataObject
-
-                Status status = validate(container, (NodeFlow) entry);
-                if (!status.isSuccess()) {
-                    logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
-                    String error = "Invalid Configuration (" + status.getDescription() + ")";
-                    logger.error(error);
-                    return;
-                }
-                // Presence check
-                if (flowEntryExists((NodeFlow) entry)) {
-                    String error = "Entry with this name on specified table already exists";
-                    logger.warn("Entry with this name on specified table already exists: {}", entry);
-                    logger.error(error);
-                    return;
-                }
-                if (originalSwView.containsKey(entry)) {
-                    logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
-                    logger.trace("Aborting to install {}", entry);
-                    continue;
-                }
-                if (!FRMUtil.validateMatch((NodeFlow) entry)) {
-                    logger.error("Not a valid Match");
-                    return;
-                }
-                if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
-                    logger.error("Not a valid Instruction");
-                    return;
-                }
-                if (entry.getValue() instanceof Flow) {
-                    Flow flow = (Flow) entry.getValue();
-                    preparePutEntry(entry.getKey(), flow);
+                DataObject value = entry.getValue();
+                if(value instanceof Flow ) {
+                    Flow flow = (Flow)value;
+                    boolean status = validate(flow);
+                    if (!status) {
+                        return;
+                    }
+                    // Presence check
+                    /*
+                     * This is breaking due to some improper use of caches...
+                     *
+                    if (flowEntryExists(flow)) {
+                        String error = "Entry with this name on specified table already exists";
+                        logger.warn("Entry with this name on specified table already exists: {}", entry);
+                        logger.error(error);
+                        return;
+                    }
+                    if (originalSwView.containsKey(entry)) {
+                        logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
+                        logger.trace("Aborting to install {}", entry);
+                        continue;
+                    }
+                    */
+                    if (!FRMUtil.validateMatch(flow)) {
+                        logger.error("Not a valid Match");
+                        return;
+                    }
+                    if (!FRMUtil.validateInstructions(flow)) {
+                        logger.error("Not a valid Instruction");
+                        return;
+                    }
+                    /*
+                     * Commented out due to Clustering Services issues
+                     * preparePutEntry(entry.getKey(), flow);
+                     */
                 }
-
             }
 
             // removals = modification.getRemovedConfigurationData();
@@ -398,7 +437,7 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
             commitToPlugin(this);
             // We return true if internal transaction is successful.
             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
-            return Rpcs.getRpcResult(true, null, null);
+            return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
         }
 
         /**
@@ -411,58 +450,48 @@ public class FlowConsumerImpl implements IForwardingRulesManager {
             // NOOP - we did not modified any internal state during
             // requestCommit phase
             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
-            return Rpcs.getRpcResult(true, null, null);
+            return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
 
         }
 
-        public Status validate(IContainer container, NodeFlow dataObject) {
+        public boolean validate(Flow flow) {
+
+            String msg = ""; // Specific part of warn/error log
 
-            // container validation
-            Switch sw = null;
-            Node node = null;
-            String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
-            ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
-                    containerName, this);
+            boolean result  = true;
             // flow Name validation
-            if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
-                    || !dataObject.getFlowName().matches(NAMEREGEX)) {
-                return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
+            if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty()
+                    || !flow.getFlowName().matches(NAMEREGEX)) {
+                msg = "Invalid Flow name";
+                result = false;
             }
             // Node Validation
-            if (dataObject.getNode() == null) {
-                return new Status(StatusCode.BADREQUEST, "Node is null");
+            if (result == true && flow.getNode() == null) {
+                msg = "Node is null";
+                result = false;
             }
 
-            if (switchManager != null) {
-                for (Switch device : switchManager.getNetworkDevices()) {
-                    node = (Node) device.getNode();
-                    if (device.getNode().equals(dataObject.getNode())) {
-                        sw = device;
-                        break;
-                    }
-                }
-                if (sw == null) {
-                    return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
-                }
-            } else {
-                logger.debug("switchmanager is not set yet");
-            }
+            // TODO: Validate we are seeking to program a flow against a valid Node
 
-            if (dataObject.getPriority() != null) {
-                if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
-                    return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
-                            dataObject.getPriority()));
+            if (result == true && flow.getPriority() != null) {
+                if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
+                    msg = String.format("priority %s is not in the range 0 - 65535",
+                            flow.getPriority());
+                    result = false;
                 }
             }
-
-            return new Status(StatusCode.SUCCESS);
+            if (result == false) {
+                logger.warn("Invalid Configuration for flow {}. The failure is {}",flow,msg);
+                logger.error("Invalid Configuration ({})",msg);
+            }
+            return result;
         }
 
-        private boolean flowEntryExists(NodeFlow config) {
+        private boolean flowEntryExists(Flow flow) {
             // Flow name has to be unique on per table id basis
             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
-                if (entry.getValue().getFlowName().equals(config.getFlowName())
-                        && entry.getValue().getTableId().equals(config.getTableId())) {
+                if (entry.getValue().getFlowName().equals(flow.getFlowName())
+                        && entry.getValue().getTableId().equals(flow.getTableId())) {
                     return true;
                 }
             }