/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.applications.frsync.util; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.common.Uint32; import org.opendaylight.yangtools.yang.common.Uint8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Util methods for group reconcil task (future chaining, transforms). */ public final class ReconcileUtil { private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class); private ReconcileUtil() { throw new IllegalStateException("This class should not be instantiated."); } /** * Creates a single rpc result of type Void honoring all partial rpc results. * * @param previousItemAction description for case when the triggering future contains failure * @param type of rpc output (gathered in list) * @return single rpc result of type Void honoring all partial rpc results */ public static Function>, RpcResult> createRpcResultCondenser( final String previousItemAction) { return input -> { final RpcResultBuilder resultSink; if (input != null) { List errors = new ArrayList<>(); for (RpcResult rpcResult : input) { if (!rpcResult.isSuccessful()) { errors.addAll(rpcResult.getErrors()); } } if (errors.isEmpty()) { resultSink = RpcResultBuilder.success(); } else { resultSink = RpcResultBuilder.failed().withRpcErrors(errors); } } else { resultSink = RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed"); } return resultSink.build(); }; } /** * Creates a single rpc result of type Void honoring all partial rpc results. * * @param actionDescription description for case when the triggering future contains failure * @param type of rpc output (gathered in list) * @return single rpc result of type Void honoring all partial rpc results */ public static Function, RpcResult> createRpcResultToVoidFunction( final String actionDescription) { return input -> { final RpcResultBuilder resultSink; if (input != null) { List errors = new ArrayList<>(); if (!input.isSuccessful()) { errors.addAll(input.getErrors()); resultSink = RpcResultBuilder.failed().withRpcErrors(errors); } else { resultSink = RpcResultBuilder.success(); } } else { resultSink = RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed"); } return resultSink.build(); }; } /** * Flushes a chain barrier. * * @param nodeIdent flow capable node path - target device for routed rpc * @param flowCapableTransactionService barrier rpc service * @return async barrier result */ public static AsyncFunction, RpcResult> chainBarrierFlush( final InstanceIdentifier nodeIdent, final FlowCapableTransactionService flowCapableTransactionService) { return input -> { final SendBarrierInput barrierInput = new SendBarrierInputBuilder() .setNode(new NodeRef(nodeIdent)) .build(); ListenableFuture> result = flowCapableTransactionService.sendBarrier(barrierInput); return Futures.transformAsync(result, input1 -> { if (input1.isSuccessful()) { return Futures.immediateFuture(RpcResultBuilder.success().build()); } else { return Futures.immediateFailedFuture(null); } }, MoreExecutors.directExecutor()); }; } /** * Returns a list of safe synchronization steps with updates. * * @param nodeId target node * @param installedGroupsArg groups resent on device * @param pendingGroups groups configured for device * @return list of safe synchronization steps with updates */ public static List> resolveAndDivideGroupDiffs(final NodeId nodeId, final Map installedGroupsArg, final Collection pendingGroups) { return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true); } /** * Returns a list of safe synchronization steps. * * @param nodeId target node * @param installedGroupsArg groups resent on device * @param pendingGroups groups configured for device * @param gatherUpdates check content of pending item if present on device (and create update task eventually) * @return list of safe synchronization steps */ public static List> resolveAndDivideGroupDiffs(final NodeId nodeId, final Map installedGroupsArg, final Collection pendingGroups, final boolean gatherUpdates) { final Map installedGroups = new HashMap<>(installedGroupsArg); final List> plan = new ArrayList<>(); while (!Iterables.isEmpty(pendingGroups)) { final ItemSyncBox stepPlan = new ItemSyncBox<>(); final Iterator iterator = pendingGroups.iterator(); final Map installIncrement = new HashMap<>(); while (iterator.hasNext()) { final Group group = iterator.next(); final Group existingGroup = installedGroups.get(group.getGroupId().getValue()); if (existingGroup != null) { if (!gatherUpdates) { iterator.remove(); } else { // check buckets and eventually update if (group.equals(existingGroup)) { iterator.remove(); } else { if (checkGroupPrecondition(installedGroups.keySet(), group)) { iterator.remove(); LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(), nodeId); stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup, group)); } } } } else if (checkGroupPrecondition(installedGroups.keySet(), group)) { iterator.remove(); installIncrement.put(group.getGroupId().getValue(), group); stepPlan.getItemsToPush().add(group); } } if (!stepPlan.isEmpty()) { // atomic update of installed flows in order to keep plan portions clean of local group dependencies installedGroups.putAll(installIncrement); plan.add(stepPlan); } else if (!pendingGroups.isEmpty()) { LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, " + "resolving stuck at level {}", nodeId.getValue(), plan.size()); throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions"); } } return plan; } public static boolean checkGroupPrecondition(final Set installedGroupIds, final Group pendingGroup) { boolean okToInstall = true; // check each bucket in the pending group for (Bucket bucket : pendingGroup.getBuckets().nonnullBucket().values()) { for (Action action : bucket.nonnullAction().values()) { // if the output action is a group if (GroupActionCase.class.equals(action.getAction().implementedInterface())) { Uint32 groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId(); // see if that output group is installed if (!installedGroupIds.contains(groupId)) { // if not installed, we have missing dependencies and cannot install this pending group okToInstall = false; break; } } } if (!okToInstall) { break; } } return okToInstall; } public static int countTotalPushed(final Iterable> groupsAddPlan) { int count = 0; for (ItemSyncBox groupItemSyncBox : groupsAddPlan) { count += groupItemSyncBox.getItemsToPush().size(); } return count; } public static int countTotalUpdated(final Iterable> groupsAddPlan) { int count = 0; for (ItemSyncBox groupItemSyncBox : groupsAddPlan) { count += groupItemSyncBox.getItemsToUpdate().size(); } return count; } /** * Resolves meter differences. * * @param nodeId target node * @param meterOperationalMap meters present on device * @param metersConfigured meters configured for device * @param gatherUpdates check content of pending item if present on device (and create update task eventually) * @return synchronization box */ public static ItemSyncBox resolveMeterDiffs(final NodeId nodeId, final Map meterOperationalMap, final Collection metersConfigured, final boolean gatherUpdates) { LOG.trace("resolving meters for {}", nodeId.getValue()); final ItemSyncBox syncBox = new ItemSyncBox<>(); for (Meter meter : metersConfigured) { final Meter existingMeter = meterOperationalMap.get(meter.getMeterId()); if (existingMeter == null) { syncBox.getItemsToPush().add(meter); } else { // compare content and eventually update if (gatherUpdates && !meter.equals(existingMeter)) { syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter)); } } } return syncBox; } /** * Resolves flow differences in a table. * * @param flowsConfigured flows resent on device * @param flowOperationalMap flows configured for device * @param gatherUpdates check content of pending item if present on device (and create update task eventually) * @return list of safe synchronization steps */ private static ItemSyncBox resolveFlowDiffsInTable(final Collection flowsConfigured, final Map flowOperationalMap, final boolean gatherUpdates) { final ItemSyncBox flowsSyncBox = new ItemSyncBox<>(); // loop configured flows and check if already present on device for (final Flow flow : flowsConfigured) { final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap); if (existingFlow == null) { flowsSyncBox.getItemsToPush().add(flow); } else { // check instructions and eventually update if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) { flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow)); } } } return flowsSyncBox; } /** * Resolves flow differences in all tables. * * @param nodeId target node * @param tableOperationalMap flow-tables resent on device * @param tablesConfigured flow-tables configured for device * @param gatherUpdates check content of pending item if present on device (and create update task eventually) * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps */ public static Map> resolveFlowDiffsInAllTables(final NodeId nodeId, final Map tableOperationalMap, final Collection tablesConfigured, final boolean gatherUpdates) { LOG.trace("resolving flows in tables for {}", nodeId.getValue()); final Map> tableFlowSyncBoxes = new HashMap<>(); for (final Table tableConfigured : tablesConfigured) { final Collection flowsConfigured = tableConfigured.nonnullFlow().values(); if (flowsConfigured.isEmpty()) { continue; } // lookup table (on device) final Table tableOperational = tableOperationalMap.get(tableConfigured.getId()); // wrap existing (on device) flows in current table into map final Map flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap( tableOperational != null ? tableOperational.nonnullFlow().values() : null); final ItemSyncBox flowsSyncBox = resolveFlowDiffsInTable( flowsConfigured, flowOperationalMap, gatherUpdates); if (!flowsSyncBox.isEmpty()) { tableFlowSyncBoxes.put(tableConfigured.key(), flowsSyncBox); } } return tableFlowSyncBoxes; } public static Collection safeGroups(FlowCapableNode node) { return node == null ? Collections.emptyList() : node.nonnullGroup().values(); } public static Collection
safeTables(FlowCapableNode node) { return node == null ? Collections.emptyList() : node.nonnullTable().values(); } public static Collection safeMeters(FlowCapableNode node) { return node == null ? Collections.emptyList() : node.nonnullMeter().values(); } }