X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-sync%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrsync%2Futil%2FReconcileUtil.java;h=af58030a84019442df3a69a5782b3d9bcbcefcdf;hb=777c94332871b8c34f56f7f2010de1536cb759ba;hp=913606ccabd7fa81d5e5a1b2457b39ee4a71ed81;hpb=dd8fdfa0effe55bfda266ab8e0b7aaf92caa98ae;p=openflowplugin.git diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java index 913606ccab..af58030a84 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java @@ -1,23 +1,27 @@ -/** +/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.frsync.util; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -28,6 +32,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; @@ -35,95 +40,127 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; +import org.opendaylight.yangtools.yang.common.Uint8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - /** * Util methods for group reconcil task (future chaining, transforms). */ -public class ReconcileUtil { - +public final class ReconcileUtil { private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class); + private ReconcileUtil() { + // Hidden on purpose + } + /** + * Creates a single rpc result of type Void honoring all partial rpc results. + * * @param previousItemAction description for case when the triggering future contains failure * @param type of rpc output (gathered in list) * @return single rpc result of type Void honoring all partial rpc results */ - public static Function>, RpcResult> createRpcResultCondenser(final String previousItemAction) { - return new Function>, RpcResult>() { - @Nullable - @Override - public RpcResult apply(@Nullable final List> input) { - final RpcResultBuilder resultSink; - if (input != null) { - List errors = new ArrayList<>(); - for (RpcResult rpcResult : input) { - if (!rpcResult.isSuccessful()) { - errors.addAll(rpcResult.getErrors()); - } - } - if (errors.isEmpty()) { - resultSink = RpcResultBuilder.success(); - } else { - resultSink = RpcResultBuilder.failed().withRpcErrors(errors); + public static Function>, RpcResult> createRpcResultCondenser( + final String previousItemAction) { + return input -> { + final RpcResultBuilder resultSink; + if (input != null) { + List errors = new ArrayList<>(); + for (RpcResult rpcResult : input) { + if (!rpcResult.isSuccessful()) { + errors.addAll(rpcResult.getErrors()); } + } + if (errors.isEmpty()) { + resultSink = RpcResultBuilder.success(); } else { - resultSink = RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed"); - + resultSink = RpcResultBuilder.failed().withRpcErrors(errors); } + } else { + resultSink = RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "previous " + previousItemAction + " failed"); + } + return resultSink.build(); + }; + } - return resultSink.build(); + /** + * Creates a single rpc result of type Void honoring all partial rpc results. + * + * @param actionDescription description for case when the triggering future contains failure + * @param type of rpc output (gathered in list) + * @return single rpc result of type Void honoring all partial rpc results + */ + public static Function, RpcResult> createRpcResultToVoidFunction( + final String actionDescription) { + return input -> { + final RpcResultBuilder resultSink; + if (input != null) { + List errors = new ArrayList<>(); + if (!input.isSuccessful()) { + errors.addAll(input.getErrors()); + resultSink = RpcResultBuilder.failed().withRpcErrors(errors); + } else { + resultSink = RpcResultBuilder.success(); + } + } else { + resultSink = RpcResultBuilder.failed() + .withError(ErrorType.APPLICATION, "action of " + actionDescription + " failed"); } + return resultSink.build(); }; } /** - * @param nodeIdent flow capable node path - target device for routed rpc + * Flushes a chain barrier. + * + * @param nodeIdent flow capable node path - target device for routed rpc * @param flowCapableTransactionService barrier rpc service * @return async barrier result */ public static AsyncFunction, RpcResult> chainBarrierFlush( final InstanceIdentifier nodeIdent, final FlowCapableTransactionService flowCapableTransactionService) { - return new AsyncFunction, RpcResult>() { - @Override - public ListenableFuture> apply(final RpcResult input) throws Exception { - final SendBarrierInput barrierInput = new SendBarrierInputBuilder() - .setNode(new NodeRef(nodeIdent)) - .build(); - return JdkFutureAdapters.listenInPoolThread(flowCapableTransactionService.sendBarrier(barrierInput)); - } + return input -> { + final SendBarrierInput barrierInput = new SendBarrierInputBuilder() + .setNode(new NodeRef(nodeIdent)) + .build(); + ListenableFuture> result + = flowCapableTransactionService.sendBarrier(barrierInput); + + return Futures.transformAsync(result, input1 -> { + if (input1.isSuccessful()) { + return Futures.immediateFuture(RpcResultBuilder.success().build()); + } else { + return Futures.immediateFailedFuture(null); + } + }, MoreExecutors.directExecutor()); }; } /** - * @param nodeId target node + * Returns a list of safe synchronization steps with updates. + * + * @param nodeId target node * @param installedGroupsArg groups resent on device * @param pendingGroups groups configured for device * @return list of safe synchronization steps with updates */ public static List> resolveAndDivideGroupDiffs(final NodeId nodeId, - final Map installedGroupsArg, + final Map installedGroupsArg, final Collection pendingGroups) { return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true); } /** + * Returns a list of safe synchronization steps. + * * @param nodeId target node * @param installedGroupsArg groups resent on device * @param pendingGroups groups configured for device @@ -131,36 +168,29 @@ public class ReconcileUtil { * @return list of safe synchronization steps */ public static List> resolveAndDivideGroupDiffs(final NodeId nodeId, - final Map installedGroupsArg, + final Map installedGroupsArg, final Collection pendingGroups, final boolean gatherUpdates) { - - final Map installedGroups = new HashMap<>(installedGroupsArg); + final Map installedGroups = new HashMap<>(installedGroupsArg); final List> plan = new ArrayList<>(); while (!Iterables.isEmpty(pendingGroups)) { final ItemSyncBox stepPlan = new ItemSyncBox<>(); final Iterator iterator = pendingGroups.iterator(); - final Map installIncrement = new HashMap<>(); + final Map installIncrement = new HashMap<>(); while (iterator.hasNext()) { final Group group = iterator.next(); final Group existingGroup = installedGroups.get(group.getGroupId().getValue()); if (existingGroup != null) { - if (!gatherUpdates) { + if (!gatherUpdates || group.equals(existingGroup)) { iterator.remove(); - } else { + } else if (checkGroupPrecondition(installedGroups.keySet(), group)) { // check buckets and eventually update - if (group.equals(existingGroup)) { - iterator.remove(); - } else { - if (checkGroupPrecondition(installedGroups.keySet(), group)) { - iterator.remove(); - LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(), nodeId); - stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup, group)); - } - } + iterator.remove(); + LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(), nodeId); + stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup, group)); } } else if (checkGroupPrecondition(installedGroups.keySet(), group)) { iterator.remove(); @@ -174,8 +204,8 @@ public class ReconcileUtil { installedGroups.putAll(installIncrement); plan.add(stepPlan); } else if (!pendingGroups.isEmpty()) { - LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, " + - "resolving stuck at level {}", nodeId.getValue(), plan.size()); + LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, " + + "resolving stuck at level {}", nodeId.getValue(), plan.size()); throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions"); } } @@ -183,14 +213,14 @@ public class ReconcileUtil { return plan; } - public static boolean checkGroupPrecondition(final Set installedGroupIds, final Group pendingGroup) { + public static boolean checkGroupPrecondition(final Set installedGroupIds, final Group pendingGroup) { boolean okToInstall = true; // check each bucket in the pending group - for (Bucket bucket : pendingGroup.getBuckets().getBucket()) { - for (Action action : bucket.getAction()) { + for (Bucket bucket : pendingGroup.getBuckets().nonnullBucket().values()) { + for (Action action : bucket.nonnullAction().values()) { // if the output action is a group - if (GroupActionCase.class.equals(action.getAction().getImplementedInterface())) { - Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId(); + if (GroupActionCase.class.equals(action.getAction().implementedInterface())) { + Uint32 groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId(); // see if that output group is installed if (!installedGroupIds.contains(groupId)) { // if not installed, we have missing dependencies and cannot install this pending group @@ -206,7 +236,7 @@ public class ReconcileUtil { return okToInstall; } - public static int countTotalAdds(final List> groupsAddPlan) { + public static int countTotalPushed(final Iterable> groupsAddPlan) { int count = 0; for (ItemSyncBox groupItemSyncBox : groupsAddPlan) { count += groupItemSyncBox.getItemsToPush().size(); @@ -214,7 +244,7 @@ public class ReconcileUtil { return count; } - public static int countTotalUpdated(final List> groupsAddPlan) { + public static int countTotalUpdated(final Iterable> groupsAddPlan) { int count = 0; for (ItemSyncBox groupItemSyncBox : groupsAddPlan) { count += groupItemSyncBox.getItemsToUpdate().size(); @@ -223,6 +253,8 @@ public class ReconcileUtil { } /** + * Resolves meter differences. + * * @param nodeId target node * @param meterOperationalMap meters present on device * @param metersConfigured meters configured for device @@ -231,9 +263,9 @@ public class ReconcileUtil { */ public static ItemSyncBox resolveMeterDiffs(final NodeId nodeId, final Map meterOperationalMap, - final List metersConfigured, + final Collection metersConfigured, final boolean gatherUpdates) { - LOG.trace("resolving meters for {}", nodeId); + LOG.trace("resolving meters for {}", nodeId.getValue()); final ItemSyncBox syncBox = new ItemSyncBox<>(); for (Meter meter : metersConfigured) { final Meter existingMeter = meterOperationalMap.get(meter.getMeterId()); @@ -250,15 +282,16 @@ public class ReconcileUtil { } /** + * Resolves flow differences in a table. + * * @param flowsConfigured flows resent on device * @param flowOperationalMap flows configured for device * @param gatherUpdates check content of pending item if present on device (and create update task eventually) * @return list of safe synchronization steps */ - @VisibleForTesting - static ItemSyncBox resolveFlowDiffsInTable(final List flowsConfigured, - final Map flowOperationalMap, - final boolean gatherUpdates) { + private static ItemSyncBox resolveFlowDiffsInTable(final Collection flowsConfigured, + final Map flowOperationalMap, + final boolean gatherUpdates) { final ItemSyncBox flowsSyncBox = new ItemSyncBox<>(); // loop configured flows and check if already present on device for (final Flow flow : flowsConfigured) { @@ -277,6 +310,8 @@ public class ReconcileUtil { } /** + * Resolves flow differences in all tables. + * * @param nodeId target node * @param tableOperationalMap flow-tables resent on device * @param tablesConfigured flow-tables configured for device @@ -284,56 +319,43 @@ public class ReconcileUtil { * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps */ public static Map> resolveFlowDiffsInAllTables(final NodeId nodeId, - final Map tableOperationalMap, - final List tablesConfigured, - final boolean gatherUpdates) { - LOG.trace("resolving flows in tables for {}", nodeId); + final Map tableOperationalMap, final Collection
tablesConfigured, + final boolean gatherUpdates) { + LOG.trace("resolving flows in tables for {}", nodeId.getValue()); final Map> tableFlowSyncBoxes = new HashMap<>(); for (final Table tableConfigured : tablesConfigured) { - final List flowsConfigured = tableConfigured.getFlow(); - if (flowsConfigured == null || flowsConfigured.isEmpty()) { + final Collection flowsConfigured = tableConfigured.nonnullFlow().values(); + if (flowsConfigured.isEmpty()) { continue; } // lookup table (on device) final Table tableOperational = tableOperationalMap.get(tableConfigured.getId()); // wrap existing (on device) flows in current table into map - final Map flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap( + final Map flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap( tableOperational != null - ? tableOperational.getFlow() + ? tableOperational.nonnullFlow().values() : null); final ItemSyncBox flowsSyncBox = resolveFlowDiffsInTable( flowsConfigured, flowOperationalMap, gatherUpdates); if (!flowsSyncBox.isEmpty()) { - tableFlowSyncBoxes.put(tableConfigured.getKey(), flowsSyncBox); + tableFlowSyncBoxes.put(tableConfigured.key(), flowsSyncBox); } } return tableFlowSyncBoxes; } - public static List safeGroups(FlowCapableNode node) { - if (node == null) { - return Collections.emptyList(); - } - - return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.of()); + public static Collection safeGroups(FlowCapableNode node) { + return node == null ? Collections.emptyList() : node.nonnullGroup().values(); } - public static List
safeTables(FlowCapableNode node) { - if (node == null) { - return Collections.emptyList(); - } - - return MoreObjects.firstNonNull(node.getTable(), ImmutableList.
of()); + public static Collection
safeTables(FlowCapableNode node) { + return node == null ? Collections.emptyList() : node.nonnullTable().values(); } - public static List safeMeters(FlowCapableNode node) { - if (node == null) { - return Collections.emptyList(); - } - - return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.of()); + public static Collection safeMeters(FlowCapableNode node) { + return node == null ? Collections.emptyList() : node.nonnullMeter().values(); } }