From 8cfa4cc8fc4a722d910160c0cff6276b1f455d31 Mon Sep 17 00:00:00 2001 From: Shigeru Yasuda Date: Thu, 16 Jun 2016 15:58:53 +0900 Subject: [PATCH] Bug 6073: Wait for completion of add-group RPCs as needed. We need to wait for completion of add-group RPCs that install groups referenced from other flows/groups. Additional changes: * Fixed NPE in group reconciliation caused by null buckets. Change-Id: Iad9bdf62ecdb189fe5e6080085f2ab7cc3131396 Signed-off-by: Shigeru Yasuda Signed-off-by: Shuva Kar --- .../frm/ForwardingRulesCommiter.java | 4 +- .../applications/frm/impl/FlowForwarder.java | 14 +- .../frm/impl/FlowNodeReconciliationImpl.java | 149 +++++++++++++++--- .../applications/frm/impl/GroupForwarder.java | 8 +- .../applications/frm/impl/MeterForwarder.java | 8 +- .../applications/frm/impl/TableForwarder.java | 9 +- 6 files changed, 159 insertions(+), 33 deletions(-) diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java index 44d33ca6b4..8f099b60ed 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/ForwardingRulesCommiter.java @@ -57,8 +57,10 @@ public interface ForwardingRulesCommiter extends AutoClos * @param identifier - the whole path to new DataObject * @param add - new DataObject * @param nodeIdent Node InstanceIdentifier + * @return A future associated with RPC task. {@code null} is set to the + * future if this method does not invoke RPC. */ - void add(InstanceIdentifier identifier, D add, + Future add(InstanceIdentifier identifier, D add, InstanceIdentifier nodeIdent); diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java index 5d939a9662..aff5cea0dd 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowForwarder.java @@ -31,6 +31,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput; @@ -178,10 +179,11 @@ public class FlowForwarder extends AbstractListeningCommiter { } @Override - public void add(final InstanceIdentifier identifier, - final Flow addDataObj, - final InstanceIdentifier nodeIdent) { + public Future> add( + final InstanceIdentifier identifier, final Flow addDataObj, + final InstanceIdentifier nodeIdent) { + Future> future; final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class); if (tableIdValidationPrecondition(tableKey, addDataObj)) { final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj); @@ -190,8 +192,12 @@ public class FlowForwarder extends AbstractListeningCommiter { builder.setFlowRef(new FlowRef(identifier)); builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - provider.getSalFlowService().addFlow(builder.build()); + future = provider.getSalFlowService().addFlow(builder.build()); + } else { + future = Futures.>immediateFuture(null); } + + return future; } @Override diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java index a83502d797..c3f13942cf 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java @@ -8,6 +8,7 @@ package org.opendaylight.openflowplugin.applications.frm.impl; + import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -16,17 +17,25 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.ListIterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.HashMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; + import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; @@ -56,6 +65,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder; @@ -90,6 +100,18 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class); + /** + * The number of nanoseconds to wait for a single group to be added. + */ + private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3); + + /** + * The maximum number of nanoseconds to wait for completion of add-group + * RPCs. + */ + private static final long MAX_ADD_GROUP_TIMEOUT = + TimeUnit.SECONDS.toNanos(20); + private final DataBroker dataBroker; private final ForwardingRulesManager provider; @@ -248,9 +270,8 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { ReadOnlyTransaction trans = provider.getReadTranaction(); Optional flowNode = Optional.absent(); - AtomicInteger counter = new AtomicInteger(); //initialize the counter - counter.set(0); + int counter = 0; try { flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get(); } catch (Exception e) { @@ -274,12 +295,12 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { ? flowNode.get().getGroup() : Collections.emptyList(); List toBeInstalledGroups = new ArrayList<>(); toBeInstalledGroups.addAll(groups); - List alreadyInstalledGroupids = new ArrayList<>(); //new list for suspected groups pointing to ports .. when the ports come up late List suspectedGroups = new ArrayList<>(); + Map> groupFutures = new HashMap<>(); while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) && - (counter.get() <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold + (counter <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) { LOG.error("These Groups are pointing to node-connectors that are not up yet {}", suspectedGroups.toString()); @@ -291,8 +312,18 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { while (iterator.hasNext()) { Group group = iterator.next(); boolean okToInstall = true; - for (Bucket bucket : group.getBuckets().getBucket()) { - for (Action action : bucket.getAction()) { + Buckets buckets = group.getBuckets(); + List bucketList = (buckets == null) + ? null : buckets.getBucket(); + if (bucketList == null) { + bucketList = Collections.emptyList(); + } + for (Bucket bucket : bucketList) { + List actions = bucket.getAction(); + if (actions == null) { + actions = Collections.emptyList(); + } + for (Action action : actions) { //chained-port if (action.getAction().getImplementedInterface().getName() .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")) { @@ -322,30 +353,30 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { else if (action.getAction().getImplementedInterface().getName() .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase")) { Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId(); - if (!alreadyInstalledGroupids.contains(groupId)) { + ListenableFuture future = + groupFutures.get(groupId); + if (future == null) { okToInstall = false; break; } + + // Need to ensure that the group specified + // by group-action is already installed. + awaitGroup(sNode, future); } } if (!okToInstall) { //increment retry counter value - counter.incrementAndGet(); + counter++; break; } - - } - if (okToInstall) { - final KeyedInstanceIdentifier groupIdent = - nodeIdentity.child(Group.class, group.getKey()); - provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); - alreadyInstalledGroupids.add(group.getGroupId().getValue()); + addGroup(groupFutures, group); iterator.remove(); // resetting the counter to zero - counter.set(0); + counter = 0; } } } @@ -355,9 +386,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { for (Group group : toBeInstalledGroups) { LOG.error("Installing the group {} finally although the port is not up after checking for {} times " , group.getGroupId().toString(), provider.getConfiguration().getReconciliationRetryCount()); - final KeyedInstanceIdentifier groupIdent = - nodeIdentity.child(Group.class, group.getKey()); - provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); + addGroup(groupFutures, group); } } /* Meters */ @@ -368,6 +397,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { nodeIdentity.child(Meter.class, meter.getKey()); provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); } + + // Need to wait for all groups to be installed before adding + // flows. + awaitGroups(sNode, groupFutures.values()); + /* Flows */ List tables = flowNode.get().getTable() != null ? flowNode.get().getTable() : Collections.
emptyList(); @@ -385,8 +419,85 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { /* clean transaction */ trans.close(); } + + /** + * Invoke add-group RPC, and put listenable future associated with the + * RPC into the given map. + * + * @param map The map to store listenable futures associated with + * add-group RPC. + * @param group The group to add. + */ + private void addGroup(Map> map, Group group) { + KeyedInstanceIdentifier groupIdent = + nodeIdentity.child(Group.class, group.getKey()); + final Long groupId = group.getGroupId().getValue(); + ListenableFuture future = JdkFutureAdapters.listenInPoolThread( + provider.getGroupCommiter().add( + groupIdent, group, nodeIdentity)); + + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Object result) { + if (LOG.isTraceEnabled()) { + LOG.trace("add-group RPC completed: node={}, id={}", + nodeIdentity.firstKeyOf(Node.class).getId(). + getValue(), groupId); + } + } + + @Override + public void onFailure(Throwable cause) { + String msg = "add-group RPC failed: node=" + + nodeIdentity.firstKeyOf(Node.class).getId().getValue() + + ", id=" + groupId; + LOG.error(msg, cause); + } + }); + + map.put(groupId, future); + } + + /** + * Wait for completion of add-group RPC. + * + * @param nodeId The identifier for the target node. + * @param future Future associated with add-group RPC that installs + * the target group. + */ + private void awaitGroup(String nodeId, ListenableFuture future) { + awaitGroups(nodeId, Collections.singleton(future)); + } + + /** + * Wait for completion of add-group RPCs. + * + * @param nodeId The identifier for the target node. + * @param futures A collection of futures associated with add-group + * RPCs. + */ + private void awaitGroups(String nodeId, + Collection> futures) { + if (!futures.isEmpty()) { + long timeout = Math.min( + ADD_GROUP_TIMEOUT * futures.size(), MAX_ADD_GROUP_TIMEOUT); + try { + Futures.successfulAsList(futures). + get(timeout, TimeUnit.NANOSECONDS); + LOG.trace("awaitGroups() completed: node={}", nodeId); + } catch (TimeoutException e) { + LOG.warn("add-group RPCs did not complete: node={}", + nodeId); + } catch (Exception e) { + LOG.error("Unhandled exception while waiting for group installation on node {}", + nodeId, e); + } + } + } } + private BigInteger getDpnIdFromNodeName(String nodeName) { + String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); return new BigInteger(dpId); } diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java index 7f6d48c902..94c147dbe0 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/GroupForwarder.java @@ -23,6 +23,7 @@ import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder; @@ -145,8 +146,9 @@ public class GroupForwarder extends AbstractListeningCommiter { } @Override - public void add(final InstanceIdentifier identifier, final Group addDataObj, - final InstanceIdentifier nodeIdent) { + public Future> add( + final InstanceIdentifier identifier, final Group addDataObj, + final InstanceIdentifier nodeIdent) { final Group group = (addDataObj); final AddGroupInputBuilder builder = new AddGroupInputBuilder(group); @@ -154,7 +156,7 @@ public class GroupForwarder extends AbstractListeningCommiter { builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setGroupRef(new GroupRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - this.provider.getSalGroupService().addGroup(builder.build()); + return this.provider.getSalGroupService().addGroup(builder.build()); } @Override diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java index 09073dcde2..b731606700 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/MeterForwarder.java @@ -30,6 +30,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder; @@ -141,15 +142,16 @@ public class MeterForwarder extends AbstractListeningCommiter { } @Override - public void add(final InstanceIdentifier identifier, final Meter addDataObj, - final InstanceIdentifier nodeIdent) { + public Future> add( + final InstanceIdentifier identifier, final Meter addDataObj, + final InstanceIdentifier nodeIdent) { final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj); builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setMeterRef(new MeterRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); - this.provider.getSalMeterService().addMeter(builder.build()); + return this.provider.getSalMeterService().addMeter(builder.build()); } @Override diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java index 3a09abd34b..a579ee4d1f 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/TableForwarder.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.applications.frm.impl; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -122,9 +123,11 @@ public class TableForwarder extends AbstractListeningCommiter { } @Override - public void add(final InstanceIdentifier identifier, final TableFeatures addDataObj, - final InstanceIdentifier nodeIdent) { - //DO NOthing + public Future add( + final InstanceIdentifier identifier, + final TableFeatures addDataObj, + final InstanceIdentifier nodeIdent) { + return Futures.immediateFuture(null); } @Override -- 2.36.6