Bug 6073: Wait for completion of add-group RPCs as needed. 29/40429/7
authorShigeru Yasuda <s-yasuda@da.jp.nec.com>
Thu, 16 Jun 2016 06:58:53 +0000 (15:58 +0900)
committerShuva Kar <shuva.jyoti.kar@ericsson.com>
Fri, 22 Jul 2016 07:48:49 +0000 (13:18 +0530)
We need to wait for completion of add-group RPCs that install groups
referenced from other flows/groups.

Additional changes:

  * Fixed NPE in group reconciliation caused by null buckets.

Change-Id: Iad9bdf62ecdb189fe5e6080085f2ab7cc3131396
Signed-off-by: Shigeru Yasuda <s-yasuda@da.jp.nec.com>
Signed-off-by: Shuva Kar <shuva.jyoti.kar@ericsson.com>
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java
applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java

index 44d33ca6b475ccdcb6e44d28129454a83cb32b60..8f099b60ed71bb244b3c0b1248dcd53f6a6101a7 100644 (file)
@@ -57,8 +57,10 @@ public interface ForwardingRulesCommiter <D extends DataObject> extends AutoClos
      * @param identifier - the whole path to new DataObject
      * @param add - new DataObject
      * @param nodeIdent Node InstanceIdentifier
+     * @return A future associated with RPC task. {@code null} is set to the
+     *         future if this method does not invoke RPC.
      */
-    void add(InstanceIdentifier<D> identifier, D add,
+    Future<? extends RpcResult> add(InstanceIdentifier<D> identifier, D add,
             InstanceIdentifier<FlowCapableNode> nodeIdent);
 
 
index 5d939a96624d14a3cc4392e8d8887d62c0c8e2f1..aff5cea0dd30260b3e9efbb823d19b8ce348e7b9 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
@@ -178,10 +179,11 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
     }
 
     @Override
-    public void add(final InstanceIdentifier<Flow> identifier,
-                    final Flow addDataObj,
-                    final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+    public Future<RpcResult<AddFlowOutput>> add(
+        final InstanceIdentifier<Flow> identifier, final Flow addDataObj,
+        final InstanceIdentifier<FlowCapableNode> nodeIdent) {
 
+        Future<RpcResult<AddFlowOutput>> future;
         final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
         if (tableIdValidationPrecondition(tableKey, addDataObj)) {
             final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
@@ -190,8 +192,12 @@ public class FlowForwarder extends AbstractListeningCommiter<Flow> {
             builder.setFlowRef(new FlowRef(identifier));
             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
             builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-            provider.getSalFlowService().addFlow(builder.build());
+            future = provider.getSalFlowService().addFlow(builder.build());
+        } else {
+            future = Futures.<RpcResult<AddFlowOutput>>immediateFuture(null);
         }
+
+        return future;
     }
 
     @Override
index a83502d797332abba6cc849141a10a983bebc975..c3f13942cfab3b4899a384aa2bdcd1c24c351849 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -16,17 +17,25 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.HashMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
@@ -56,6 +65,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
@@ -90,6 +100,18 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
 
+    /**
+     * The number of nanoseconds to wait for a single group to be added.
+     */
+    private static final long  ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
+
+    /**
+     * The maximum number of nanoseconds to wait for completion of add-group
+     * RPCs.
+     */
+    private static final long  MAX_ADD_GROUP_TIMEOUT =
+        TimeUnit.SECONDS.toNanos(20);
+
     private final DataBroker dataBroker;
 
     private final ForwardingRulesManager provider;
@@ -248,9 +270,8 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
             ReadOnlyTransaction trans = provider.getReadTranaction();
             Optional<FlowCapableNode> flowNode = Optional.absent();
 
-            AtomicInteger counter = new AtomicInteger();
             //initialize the counter
-            counter.set(0);
+            int counter = 0;
             try {
                 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
             } catch (Exception e) {
@@ -274,12 +295,12 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                         ? flowNode.get().getGroup() : Collections.<Group>emptyList();
                 List<Group> toBeInstalledGroups = new ArrayList<>();
                 toBeInstalledGroups.addAll(groups);
-                List<Long> alreadyInstalledGroupids = new ArrayList<>();
                 //new list for suspected groups pointing to ports .. when the ports come up late
                 List<Group> suspectedGroups = new ArrayList<>();
+                Map<Long, ListenableFuture<?>> groupFutures = new HashMap<>();
 
                 while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) &&
-                        (counter.get() <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold
+                        (counter <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold
 
                     if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) {
                         LOG.error("These Groups are pointing to node-connectors that are not up yet {}", suspectedGroups.toString());
@@ -291,8 +312,18 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                     while (iterator.hasNext()) {
                         Group group = iterator.next();
                         boolean okToInstall = true;
-                        for (Bucket bucket : group.getBuckets().getBucket()) {
-                            for (Action action : bucket.getAction()) {
+                        Buckets buckets = group.getBuckets();
+                        List<Bucket> bucketList = (buckets == null)
+                            ? null : buckets.getBucket();
+                        if (bucketList == null) {
+                            bucketList = Collections.<Bucket>emptyList();
+                        }
+                        for (Bucket bucket : bucketList) {
+                            List<Action> actions = bucket.getAction();
+                            if (actions == null) {
+                                actions = Collections.<Action>emptyList();
+                            }
+                            for (Action action : actions) {
                                 //chained-port
                                 if (action.getAction().getImplementedInterface().getName()
                                         .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")) {
@@ -322,30 +353,30 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                                 else if (action.getAction().getImplementedInterface().getName()
                                         .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase")) {
                                     Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId();
-                                    if (!alreadyInstalledGroupids.contains(groupId)) {
+                                    ListenableFuture<?> future =
+                                        groupFutures.get(groupId);
+                                    if (future == null) {
                                         okToInstall = false;
                                         break;
                                     }
+
+                                    // Need to ensure that the group specified
+                                    // by group-action is already installed.
+                                    awaitGroup(sNode, future);
                                 }
                             }
                             if (!okToInstall) {
                                 //increment retry counter value
-                                counter.incrementAndGet();
+                                counter++;
                                 break;
                             }
-
-
                         }
 
-
                         if (okToInstall) {
-                            final KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
-                                    nodeIdentity.child(Group.class, group.getKey());
-                            provider.getGroupCommiter().add(groupIdent, group, nodeIdentity);
-                            alreadyInstalledGroupids.add(group.getGroupId().getValue());
+                            addGroup(groupFutures, group);
                             iterator.remove();
                             // resetting the counter to zero
-                            counter.set(0);
+                            counter = 0;
                         }
                     }
                 }
@@ -355,9 +386,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                     for (Group group : toBeInstalledGroups) {
                         LOG.error("Installing the group {} finally although the port is not up after checking for {} times "
                                 , group.getGroupId().toString(), provider.getConfiguration().getReconciliationRetryCount());
-                        final KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
-                                nodeIdentity.child(Group.class, group.getKey());
-                        provider.getGroupCommiter().add(groupIdent, group, nodeIdentity);
+                        addGroup(groupFutures, group);
                     }
                 }
             /* Meters */
@@ -368,6 +397,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                             nodeIdentity.child(Meter.class, meter.getKey());
                     provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
                 }
+
+                // Need to wait for all groups to be installed before adding
+                // flows.
+                awaitGroups(sNode, groupFutures.values());
+
             /* Flows */
                 List<Table> tables = flowNode.get().getTable() != null
                         ? flowNode.get().getTable() : Collections.<Table>emptyList();
@@ -385,8 +419,85 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         /* clean transaction */
             trans.close();
         }
+
+        /**
+         * Invoke add-group RPC, and put listenable future associated with the
+         * RPC into the given map.
+         *
+         * @param map        The map to store listenable futures associated with
+         *                   add-group RPC.
+         * @param group      The group to add.
+         */
+        private void addGroup(Map<Long, ListenableFuture<?>> map, Group group) {
+            KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
+                nodeIdentity.child(Group.class, group.getKey());
+            final Long groupId = group.getGroupId().getValue();
+            ListenableFuture<?> future = JdkFutureAdapters.listenInPoolThread(
+                provider.getGroupCommiter().add(
+                    groupIdent, group, nodeIdentity));
+
+            Futures.addCallback(future, new FutureCallback<Object>() {
+                @Override
+                public void onSuccess(Object result) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("add-group RPC completed: node={}, id={}",
+                                  nodeIdentity.firstKeyOf(Node.class).getId().
+                                  getValue(), groupId);
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    String msg = "add-group RPC failed: node=" +
+                        nodeIdentity.firstKeyOf(Node.class).getId().getValue() +
+                        ", id=" + groupId;
+                    LOG.error(msg, cause);
+                }
+            });
+
+            map.put(groupId, future);
+        }
+
+        /**
+         * Wait for completion of add-group RPC.
+         *
+         * @param nodeId  The identifier for the target node.
+         * @param future  Future associated with add-group RPC that installs
+         *                the target group.
+         */
+        private void awaitGroup(String nodeId, ListenableFuture<?> future) {
+            awaitGroups(nodeId, Collections.singleton(future));
+        }
+
+        /**
+         * Wait for completion of add-group RPCs.
+         *
+         * @param nodeId   The identifier for the target node.
+         * @param futures  A collection of futures associated with add-group
+         *                 RPCs.
+         */
+        private void awaitGroups(String nodeId,
+                                 Collection<ListenableFuture<?>> futures) {
+            if (!futures.isEmpty()) {
+                long timeout = Math.min(
+                    ADD_GROUP_TIMEOUT * futures.size(), MAX_ADD_GROUP_TIMEOUT);
+                try {
+                    Futures.successfulAsList(futures).
+                        get(timeout, TimeUnit.NANOSECONDS);
+                    LOG.trace("awaitGroups() completed: node={}", nodeId);
+                } catch (TimeoutException e) {
+                    LOG.warn("add-group RPCs did not complete: node={}",
+                             nodeId);
+                } catch (Exception e) {
+                    LOG.error("Unhandled exception while waiting for group installation on node {}",
+                              nodeId, e);
+                }
+            }
+        }
     }
+
     private BigInteger getDpnIdFromNodeName(String nodeName) {
+
         String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
         return new BigInteger(dpId);
     }
index 7f6d48c9026d36f30e908fdd7fa9fd86979175d4..94c147dbe00f127dbf05db6598e2132cd573e88a 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
@@ -145,8 +146,9 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
     }
 
     @Override
-    public void add(final InstanceIdentifier<Group> identifier, final Group addDataObj,
-                    final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+    public Future<RpcResult<AddGroupOutput>> add(
+        final InstanceIdentifier<Group> identifier, final Group addDataObj,
+        final InstanceIdentifier<FlowCapableNode> nodeIdent) {
 
         final Group group = (addDataObj);
         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
@@ -154,7 +156,7 @@ public class GroupForwarder extends AbstractListeningCommiter<Group> {
         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setGroupRef(new GroupRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-        this.provider.getSalGroupService().addGroup(builder.build());
+        return this.provider.getSalGroupService().addGroup(builder.build());
     }
 
     @Override
index 09073dcde2806eb453e368a69e3832362eacf41c..b731606700ff7c72de3c6bee5c0a2c87b3da2ada 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder;
@@ -141,15 +142,16 @@ public class MeterForwarder extends AbstractListeningCommiter<Meter> {
     }
 
     @Override
-    public void add(final InstanceIdentifier<Meter> identifier, final Meter addDataObj,
-                    final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+    public Future<RpcResult<AddMeterOutput>> add(
+        final InstanceIdentifier<Meter> identifier, final Meter addDataObj,
+        final InstanceIdentifier<FlowCapableNode> nodeIdent) {
 
         final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj);
 
         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
         builder.setMeterRef(new MeterRef(identifier));
         builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
-        this.provider.getSalMeterService().addMeter(builder.build());
+        return this.provider.getSalMeterService().addMeter(builder.build());
     }
 
     @Override
index 3a09abd34b11f4e96a6b9cc6f1ed3b48d67a39fb..a579ee4d1f0621477aceaeb6d1020615b38027a8 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
@@ -122,9 +123,11 @@ public class TableForwarder extends AbstractListeningCommiter<TableFeatures> {
     }
 
     @Override
-    public void add(final InstanceIdentifier<TableFeatures> identifier, final TableFeatures addDataObj,
-                    final InstanceIdentifier<FlowCapableNode> nodeIdent) {
-        //DO NOthing
+    public Future<? extends RpcResult> add(
+        final InstanceIdentifier<TableFeatures> identifier,
+        final TableFeatures addDataObj,
+        final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+        return Futures.immediateFuture(null);
     }
 
     @Override