import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
}
/**
+ * Creates a single rpc result of type Void honoring all partial rpc results.
+ *
* @param previousItemAction description for case when the triggering future contains failure
* @param <D> type of rpc output (gathered in list)
* @return single rpc result of type Void honoring all partial rpc results
*/
- public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(final String previousItemAction) {
+ public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(
+ final String previousItemAction) {
return input -> {
final RpcResultBuilder<Void> resultSink;
if (input != null) {
}
/**
+ * Creates a single rpc result of type Void honoring all partial rpc results.
+ *
* @param actionDescription description for case when the triggering future contains failure
* @param <D> type of rpc output (gathered in list)
* @return single rpc result of type Void honoring all partial rpc results
*/
- public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(final String actionDescription) {
+ public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(
+ final String actionDescription) {
return input -> {
final RpcResultBuilder<Void> resultSink;
if (input != null) {
}
/**
- * @param nodeIdent flow capable node path - target device for routed rpc
+ * Flushes a chain barrier.
+ *
+ * @param nodeIdent flow capable node path - target device for routed rpc
* @param flowCapableTransactionService barrier rpc service
* @return async barrier result
*/
final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
.setNode(new NodeRef(nodeIdent))
.build();
- return JdkFutureAdapters.listenInPoolThread(flowCapableTransactionService.sendBarrier(barrierInput));
+ ListenableFuture<RpcResult<SendBarrierOutput>> result
+ = flowCapableTransactionService.sendBarrier(barrierInput);
+
+ return Futures.transformAsync(result, input1 -> {
+ if (input1.isSuccessful()) {
+ return Futures.<RpcResult<Void>>immediateFuture(RpcResultBuilder.<Void>success().build());
+ } else {
+ return Futures.<RpcResult<Void>>immediateFailedFuture(null);
+ }
+ });
};
}
/**
- * @param nodeId target node
+ * Returns a list of safe synchronization steps with updates.
+ *
+ * @param nodeId target node
* @param installedGroupsArg groups resent on device
* @param pendingGroups groups configured for device
* @return list of safe synchronization steps with updates
}
/**
+ * Returns a list of safe synchronization steps.
+ *
* @param nodeId target node
* @param installedGroupsArg groups resent on device
* @param pendingGroups groups configured for device
} else {
if (checkGroupPrecondition(installedGroups.keySet(), group)) {
iterator.remove();
- LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(), nodeId);
- stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup, group));
+ LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(),
+ nodeId);
+ stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup,
+ group));
}
}
}
installedGroups.putAll(installIncrement);
plan.add(stepPlan);
} else if (!pendingGroups.isEmpty()) {
- LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, " +
- "resolving stuck at level {}", nodeId.getValue(), plan.size());
+ LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, "
+ + "resolving stuck at level {}", nodeId.getValue(), plan.size());
throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
}
}
for (Action action : bucket.getAction()) {
// if the output action is a group
if (GroupActionCase.class.equals(action.getAction().getImplementedInterface())) {
- Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId();
+ Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
// see if that output group is installed
if (!installedGroupIds.contains(groupId)) {
// if not installed, we have missing dependencies and cannot install this pending group
}
/**
+ * Resolves meter differences.
+ *
* @param nodeId target node
* @param meterOperationalMap meters present on device
* @param metersConfigured meters configured for device
}
/**
+ * Resolves flow differences in a table.
+ *
* @param flowsConfigured flows resent on device
* @param flowOperationalMap flows configured for device
* @param gatherUpdates check content of pending item if present on device (and create update task eventually)
* @return list of safe synchronization steps
*/
- public static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
- final Map<SwitchFlowId, Flow> flowOperationalMap,
- final boolean gatherUpdates) {
+ private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
+ final Map<FlowDescriptor, Flow> flowOperationalMap,
+ final boolean gatherUpdates) {
final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
// loop configured flows and check if already present on device
for (final Flow flow : flowsConfigured) {
}
/**
+ * Resolves flow differences in all tables.
+ *
* @param nodeId target node
* @param tableOperationalMap flow-tables resent on device
* @param tablesConfigured flow-tables configured for device
* @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
*/
public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
- final Map<Short, Table> tableOperationalMap,
- final List<Table> tablesConfigured,
- final boolean gatherUpdates) {
+ final Map<Short, Table> tableOperationalMap, final List<Table> tablesConfigured,
+ final boolean gatherUpdates) {
LOG.trace("resolving flows in tables for {}", nodeId.getValue());
final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
for (final Table tableConfigured : tablesConfigured) {
// lookup table (on device)
final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
// wrap existing (on device) flows in current table into map
- final Map<SwitchFlowId, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
+ final Map<FlowDescriptor, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
tableOperational != null
? tableOperational.getFlow()
: null);
final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
flowsConfigured, flowOperationalMap, gatherUpdates);
if (!flowsSyncBox.isEmpty()) {
- tableFlowSyncBoxes.put(tableConfigured.getKey(), flowsSyncBox);
+ tableFlowSyncBoxes.put(tableConfigured.key(), flowsSyncBox);
}
}
return tableFlowSyncBoxes;