package org.opendaylight.openflowplugin.applications.frm.impl;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
+ /**
+ * The number of nanoseconds to wait for a single group to be added.
+ */
+ private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
+
+ /**
+ * The maximum number of nanoseconds to wait for completion of add-group
+ * RPCs.
+ */
+ private static final long MAX_ADD_GROUP_TIMEOUT =
+ TimeUnit.SECONDS.toNanos(20);
+
private final DataBroker dataBroker;
private final ForwardingRulesManager provider;
ReadOnlyTransaction trans = provider.getReadTranaction();
Optional<FlowCapableNode> flowNode = Optional.absent();
- AtomicInteger counter = new AtomicInteger();
//initialize the counter
- counter.set(0);
+ int counter = 0;
try {
flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
} catch (Exception e) {
? flowNode.get().getGroup() : Collections.<Group>emptyList();
List<Group> toBeInstalledGroups = new ArrayList<>();
toBeInstalledGroups.addAll(groups);
- List<Long> alreadyInstalledGroupids = new ArrayList<>();
//new list for suspected groups pointing to ports .. when the ports come up late
List<Group> suspectedGroups = new ArrayList<>();
+ Map<Long, ListenableFuture<?>> groupFutures = new HashMap<>();
while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) &&
- (counter.get() <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold
+ (counter <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold
if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) {
LOG.error("These Groups are pointing to node-connectors that are not up yet {}", suspectedGroups.toString());
while (iterator.hasNext()) {
Group group = iterator.next();
boolean okToInstall = true;
- for (Bucket bucket : group.getBuckets().getBucket()) {
- for (Action action : bucket.getAction()) {
+ Buckets buckets = group.getBuckets();
+ List<Bucket> bucketList = (buckets == null)
+ ? null : buckets.getBucket();
+ if (bucketList == null) {
+ bucketList = Collections.<Bucket>emptyList();
+ }
+ for (Bucket bucket : bucketList) {
+ List<Action> actions = bucket.getAction();
+ if (actions == null) {
+ actions = Collections.<Action>emptyList();
+ }
+ for (Action action : actions) {
//chained-port
if (action.getAction().getImplementedInterface().getName()
.equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")) {
else if (action.getAction().getImplementedInterface().getName()
.equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase")) {
Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId();
- if (!alreadyInstalledGroupids.contains(groupId)) {
+ ListenableFuture<?> future =
+ groupFutures.get(groupId);
+ if (future == null) {
okToInstall = false;
break;
}
+
+ // Need to ensure that the group specified
+ // by group-action is already installed.
+ awaitGroup(sNode, future);
}
}
if (!okToInstall) {
//increment retry counter value
- counter.incrementAndGet();
+ counter++;
break;
}
-
-
}
-
if (okToInstall) {
- final KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
- nodeIdentity.child(Group.class, group.getKey());
- provider.getGroupCommiter().add(groupIdent, group, nodeIdentity);
- alreadyInstalledGroupids.add(group.getGroupId().getValue());
+ addGroup(groupFutures, group);
iterator.remove();
// resetting the counter to zero
- counter.set(0);
+ counter = 0;
}
}
}
for (Group group : toBeInstalledGroups) {
LOG.error("Installing the group {} finally although the port is not up after checking for {} times "
, group.getGroupId().toString(), provider.getConfiguration().getReconciliationRetryCount());
- final KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
- nodeIdentity.child(Group.class, group.getKey());
- provider.getGroupCommiter().add(groupIdent, group, nodeIdentity);
+ addGroup(groupFutures, group);
}
}
/* Meters */
nodeIdentity.child(Meter.class, meter.getKey());
provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
}
+
+ // Need to wait for all groups to be installed before adding
+ // flows.
+ awaitGroups(sNode, groupFutures.values());
+
/* Flows */
List<Table> tables = flowNode.get().getTable() != null
? flowNode.get().getTable() : Collections.<Table>emptyList();
/* clean transaction */
trans.close();
}
+
+ /**
+ * Invoke add-group RPC, and put listenable future associated with the
+ * RPC into the given map.
+ *
+ * @param map The map to store listenable futures associated with
+ * add-group RPC.
+ * @param group The group to add.
+ */
+ private void addGroup(Map<Long, ListenableFuture<?>> map, Group group) {
+ KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
+ nodeIdentity.child(Group.class, group.getKey());
+ final Long groupId = group.getGroupId().getValue();
+ ListenableFuture<?> future = JdkFutureAdapters.listenInPoolThread(
+ provider.getGroupCommiter().add(
+ groupIdent, group, nodeIdentity));
+
+ Futures.addCallback(future, new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object result) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("add-group RPC completed: node={}, id={}",
+ nodeIdentity.firstKeyOf(Node.class).getId().
+ getValue(), groupId);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ String msg = "add-group RPC failed: node=" +
+ nodeIdentity.firstKeyOf(Node.class).getId().getValue() +
+ ", id=" + groupId;
+ LOG.error(msg, cause);
+ }
+ });
+
+ map.put(groupId, future);
+ }
+
+ /**
+ * Wait for completion of add-group RPC.
+ *
+ * @param nodeId The identifier for the target node.
+ * @param future Future associated with add-group RPC that installs
+ * the target group.
+ */
+ private void awaitGroup(String nodeId, ListenableFuture<?> future) {
+ awaitGroups(nodeId, Collections.singleton(future));
+ }
+
+ /**
+ * Wait for completion of add-group RPCs.
+ *
+ * @param nodeId The identifier for the target node.
+ * @param futures A collection of futures associated with add-group
+ * RPCs.
+ */
+ private void awaitGroups(String nodeId,
+ Collection<ListenableFuture<?>> futures) {
+ if (!futures.isEmpty()) {
+ long timeout = Math.min(
+ ADD_GROUP_TIMEOUT * futures.size(), MAX_ADD_GROUP_TIMEOUT);
+ try {
+ Futures.successfulAsList(futures).
+ get(timeout, TimeUnit.NANOSECONDS);
+ LOG.trace("awaitGroups() completed: node={}", nodeId);
+ } catch (TimeoutException e) {
+ LOG.warn("add-group RPCs did not complete: node={}",
+ nodeId);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception while waiting for group installation on node {}",
+ nodeId, e);
+ }
+ }
+ }
}
+
private BigInteger getDpnIdFromNodeName(String nodeName) {
+
String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
return new BigInteger(dpId);
}