2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.util;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterables;
16 import com.google.common.util.concurrent.AsyncFunction;
17 import com.google.common.util.concurrent.JdkFutureAdapters;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.Objects;
28 import javax.annotation.Nullable;
29 import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * Util methods for group reconcil task (future chaining, transforms).
56 public class ReconcileUtil {
58 private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
61 * @param previousItemAction description for case when the triggering future contains failure
62 * @param <D> type of rpc output (gathered in list)
63 * @return single rpc result of type Void honoring all partial rpc results
65 public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(final String previousItemAction) {
66 return new Function<List<RpcResult<D>>, RpcResult<Void>>() {
69 public RpcResult<Void> apply(@Nullable final List<RpcResult<D>> input) {
70 final RpcResultBuilder<Void> resultSink;
72 List<RpcError> errors = new ArrayList<>();
73 for (RpcResult<D> rpcResult : input) {
74 if (!rpcResult.isSuccessful()) {
75 errors.addAll(rpcResult.getErrors());
78 if (errors.isEmpty()) {
79 resultSink = RpcResultBuilder.success();
81 resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
84 resultSink = RpcResultBuilder.<Void>failed()
85 .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
89 return resultSink.build();
95 * @param actionDescription description for case when the triggering future contains failure
96 * @param <D> type of rpc output (gathered in list)
97 * @return single rpc result of type Void honoring all partial rpc results
99 public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(final String actionDescription) {
100 return new Function<RpcResult<D>, RpcResult<Void>>() {
103 public RpcResult<Void> apply(@Nullable final RpcResult<D> input) {
104 final RpcResultBuilder<Void> resultSink;
106 List<RpcError> errors = new ArrayList<>();
107 if (!input.isSuccessful()) {
108 errors.addAll(input.getErrors());
109 resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
111 resultSink = RpcResultBuilder.success();
114 resultSink = RpcResultBuilder.<Void>failed()
115 .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
119 return resultSink.build();
125 * @param nodeIdent flow capable node path - target device for routed rpc
126 * @param flowCapableTransactionService barrier rpc service
127 * @return async barrier result
129 public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
130 final InstanceIdentifier<Node> nodeIdent,
131 final FlowCapableTransactionService flowCapableTransactionService) {
132 return new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
134 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
135 final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
136 .setNode(new NodeRef(nodeIdent))
138 return JdkFutureAdapters.listenInPoolThread(flowCapableTransactionService.sendBarrier(barrierInput));
144 * @param nodeId target node
145 * @param installedGroupsArg groups resent on device
146 * @param pendingGroups groups configured for device
147 * @return list of safe synchronization steps with updates
149 public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
150 final Map<Long, Group> installedGroupsArg,
151 final Collection<Group> pendingGroups) {
152 return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true);
156 * @param nodeId target node
157 * @param installedGroupsArg groups resent on device
158 * @param pendingGroups groups configured for device
159 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
160 * @return list of safe synchronization steps
162 public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
163 final Map<Long, Group> installedGroupsArg,
164 final Collection<Group> pendingGroups,
165 final boolean gatherUpdates) {
167 final Map<Long, Group> installedGroups = new HashMap<>(installedGroupsArg);
168 final List<ItemSyncBox<Group>> plan = new ArrayList<>();
170 while (!Iterables.isEmpty(pendingGroups)) {
171 final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
172 final Iterator<Group> iterator = pendingGroups.iterator();
173 final Map<Long, Group> installIncrement = new HashMap<>();
175 while (iterator.hasNext()) {
176 final Group group = iterator.next();
178 final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
179 if (existingGroup != null) {
180 if (!gatherUpdates) {
183 // check buckets and eventually update
184 if (group.equals(existingGroup)) {
187 if (checkGroupPrecondition(installedGroups.keySet(), group)) {
189 LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(), nodeId);
190 stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup, group));
194 } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
196 installIncrement.put(group.getGroupId().getValue(), group);
197 stepPlan.getItemsToPush().add(group);
201 if (!stepPlan.isEmpty()) {
202 // atomic update of installed flows in order to keep plan portions clean of local group dependencies
203 installedGroups.putAll(installIncrement);
205 } else if (!pendingGroups.isEmpty()) {
206 LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, " +
207 "resolving stuck at level {}", nodeId.getValue(), plan.size());
208 throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
215 public static boolean checkGroupPrecondition(final Set<Long> installedGroupIds, final Group pendingGroup) {
216 boolean okToInstall = true;
217 // check each bucket in the pending group
218 for (Bucket bucket : pendingGroup.getBuckets().getBucket()) {
219 for (Action action : bucket.getAction()) {
220 // if the output action is a group
221 if (GroupActionCase.class.equals(action.getAction().getImplementedInterface())) {
222 Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId();
223 // see if that output group is installed
224 if (!installedGroupIds.contains(groupId)) {
225 // if not installed, we have missing dependencies and cannot install this pending group
238 public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
240 for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
241 count += groupItemSyncBox.getItemsToPush().size();
246 public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
248 for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
249 count += groupItemSyncBox.getItemsToUpdate().size();
255 * @param nodeId target node
256 * @param meterOperationalMap meters present on device
257 * @param metersConfigured meters configured for device
258 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
259 * @return synchronization box
261 public static ItemSyncBox<Meter> resolveMeterDiffs(final NodeId nodeId,
262 final Map<MeterId, Meter> meterOperationalMap,
263 final List<Meter> metersConfigured,
264 final boolean gatherUpdates) {
265 LOG.trace("resolving meters for {}", nodeId);
266 final ItemSyncBox<Meter> syncBox = new ItemSyncBox<>();
267 for (Meter meter : metersConfigured) {
268 final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
269 if (existingMeter == null) {
270 syncBox.getItemsToPush().add(meter);
272 // compare content and eventually update
273 if (gatherUpdates && !meter.equals(existingMeter)) {
274 syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter));
282 * @param flowsConfigured flows resent on device
283 * @param flowOperationalMap flows configured for device
284 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
285 * @return list of safe synchronization steps
288 static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
289 final Map<SwitchFlowId, Flow> flowOperationalMap,
290 final boolean gatherUpdates) {
291 final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
292 // loop configured flows and check if already present on device
293 for (final Flow flow : flowsConfigured) {
294 final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
296 if (existingFlow == null) {
297 flowsSyncBox.getItemsToPush().add(flow);
299 // check instructions and eventually update
300 if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
301 flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow));
309 * @param nodeId target node
310 * @param tableOperationalMap flow-tables resent on device
311 * @param tablesConfigured flow-tables configured for device
312 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
313 * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
315 public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
316 final Map<Short, Table> tableOperationalMap,
317 final List<Table> tablesConfigured,
318 final boolean gatherUpdates) {
319 LOG.trace("resolving flows in tables for {}", nodeId);
320 final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
321 for (final Table tableConfigured : tablesConfigured) {
322 final List<Flow> flowsConfigured = tableConfigured.getFlow();
323 if (flowsConfigured == null || flowsConfigured.isEmpty()) {
327 // lookup table (on device)
328 final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
329 // wrap existing (on device) flows in current table into map
330 final Map<SwitchFlowId, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
331 tableOperational != null
332 ? tableOperational.getFlow()
336 final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
337 flowsConfigured, flowOperationalMap, gatherUpdates);
338 if (!flowsSyncBox.isEmpty()) {
339 tableFlowSyncBoxes.put(tableConfigured.getKey(), flowsSyncBox);
342 return tableFlowSyncBoxes;
345 public static List<Group> safeGroups(FlowCapableNode node) {
347 return Collections.emptyList();
350 return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.<Group>of());
353 public static List<Table> safeTables(FlowCapableNode node) {
355 return Collections.emptyList();
358 return MoreObjects.firstNonNull(node.getTable(), ImmutableList.<Table>of());
361 public static List<Meter> safeMeters(FlowCapableNode node) {
363 return Collections.emptyList();
366 return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.<Meter>of());