2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frsync.util;
10 import com.google.common.base.Function;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.List;
23 import java.util.Objects;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.opendaylight.yangtools.yang.common.RpcError;
44 import org.opendaylight.yangtools.yang.common.RpcResult;
45 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
46 import org.opendaylight.yangtools.yang.common.Uint32;
47 import org.opendaylight.yangtools.yang.common.Uint8;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 * Util methods for group reconcil task (future chaining, transforms).
54 public final class ReconcileUtil {
56 private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
58 private ReconcileUtil() {
59 throw new IllegalStateException("This class should not be instantiated.");
63 * Creates a single rpc result of type Void honoring all partial rpc results.
65 * @param previousItemAction description for case when the triggering future contains failure
66 * @param <D> type of rpc output (gathered in list)
67 * @return single rpc result of type Void honoring all partial rpc results
69 public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(
70 final String previousItemAction) {
72 final RpcResultBuilder<Void> resultSink;
74 List<RpcError> errors = new ArrayList<>();
75 for (RpcResult<D> rpcResult : input) {
76 if (!rpcResult.isSuccessful()) {
77 errors.addAll(rpcResult.getErrors());
80 if (errors.isEmpty()) {
81 resultSink = RpcResultBuilder.success();
83 resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
86 resultSink = RpcResultBuilder.<Void>failed()
87 .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
89 return resultSink.build();
94 * Creates a single rpc result of type Void honoring all partial rpc results.
96 * @param actionDescription description for case when the triggering future contains failure
97 * @param <D> type of rpc output (gathered in list)
98 * @return single rpc result of type Void honoring all partial rpc results
100 public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(
101 final String actionDescription) {
103 final RpcResultBuilder<Void> resultSink;
105 List<RpcError> errors = new ArrayList<>();
106 if (!input.isSuccessful()) {
107 errors.addAll(input.getErrors());
108 resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
110 resultSink = RpcResultBuilder.success();
113 resultSink = RpcResultBuilder.<Void>failed()
114 .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
116 return resultSink.build();
121 * Flushes a chain barrier.
123 * @param nodeIdent flow capable node path - target device for routed rpc
124 * @param flowCapableTransactionService barrier rpc service
125 * @return async barrier result
127 public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
128 final InstanceIdentifier<Node> nodeIdent,
129 final FlowCapableTransactionService flowCapableTransactionService) {
131 final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
132 .setNode(new NodeRef(nodeIdent))
134 ListenableFuture<RpcResult<SendBarrierOutput>> result
135 = flowCapableTransactionService.sendBarrier(barrierInput);
137 return Futures.transformAsync(result, input1 -> {
138 if (input1.isSuccessful()) {
139 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
141 return Futures.immediateFailedFuture(null);
143 }, MoreExecutors.directExecutor());
148 * Returns a list of safe synchronization steps with updates.
150 * @param nodeId target node
151 * @param installedGroupsArg groups resent on device
152 * @param pendingGroups groups configured for device
153 * @return list of safe synchronization steps with updates
155 public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
156 final Map<Uint32, Group> installedGroupsArg,
157 final Collection<Group> pendingGroups) {
158 return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true);
162 * Returns a list of safe synchronization steps.
164 * @param nodeId target node
165 * @param installedGroupsArg groups resent on device
166 * @param pendingGroups groups configured for device
167 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
168 * @return list of safe synchronization steps
170 public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
171 final Map<Uint32, Group> installedGroupsArg,
172 final Collection<Group> pendingGroups,
173 final boolean gatherUpdates) {
174 final Map<Uint32, Group> installedGroups = new HashMap<>(installedGroupsArg);
175 final List<ItemSyncBox<Group>> plan = new ArrayList<>();
177 while (!Iterables.isEmpty(pendingGroups)) {
178 final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
179 final Iterator<Group> iterator = pendingGroups.iterator();
180 final Map<Uint32, Group> installIncrement = new HashMap<>();
182 while (iterator.hasNext()) {
183 final Group group = iterator.next();
185 final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
186 if (existingGroup != null) {
187 if (!gatherUpdates) {
190 // check buckets and eventually update
191 if (group.equals(existingGroup)) {
194 if (checkGroupPrecondition(installedGroups.keySet(), group)) {
196 LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(),
198 stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup,
203 } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
205 installIncrement.put(group.getGroupId().getValue(), group);
206 stepPlan.getItemsToPush().add(group);
210 if (!stepPlan.isEmpty()) {
211 // atomic update of installed flows in order to keep plan portions clean of local group dependencies
212 installedGroups.putAll(installIncrement);
214 } else if (!pendingGroups.isEmpty()) {
215 LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, "
216 + "resolving stuck at level {}", nodeId.getValue(), plan.size());
217 throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
224 public static boolean checkGroupPrecondition(final Set<Uint32> installedGroupIds, final Group pendingGroup) {
225 boolean okToInstall = true;
226 // check each bucket in the pending group
227 for (Bucket bucket : pendingGroup.getBuckets().nonnullBucket().values()) {
228 for (Action action : bucket.nonnullAction().values()) {
229 // if the output action is a group
230 if (GroupActionCase.class.equals(action.getAction().implementedInterface())) {
231 Uint32 groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
232 // see if that output group is installed
233 if (!installedGroupIds.contains(groupId)) {
234 // if not installed, we have missing dependencies and cannot install this pending group
247 public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
249 for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
250 count += groupItemSyncBox.getItemsToPush().size();
255 public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
257 for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
258 count += groupItemSyncBox.getItemsToUpdate().size();
264 * Resolves meter differences.
266 * @param nodeId target node
267 * @param meterOperationalMap meters present on device
268 * @param metersConfigured meters configured for device
269 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
270 * @return synchronization box
272 public static ItemSyncBox<Meter> resolveMeterDiffs(final NodeId nodeId,
273 final Map<MeterId, Meter> meterOperationalMap,
274 final Collection<Meter> metersConfigured,
275 final boolean gatherUpdates) {
276 LOG.trace("resolving meters for {}", nodeId.getValue());
277 final ItemSyncBox<Meter> syncBox = new ItemSyncBox<>();
278 for (Meter meter : metersConfigured) {
279 final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
280 if (existingMeter == null) {
281 syncBox.getItemsToPush().add(meter);
283 // compare content and eventually update
284 if (gatherUpdates && !meter.equals(existingMeter)) {
285 syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter));
293 * Resolves flow differences in a table.
295 * @param flowsConfigured flows resent on device
296 * @param flowOperationalMap flows configured for device
297 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
298 * @return list of safe synchronization steps
300 private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final Collection<Flow> flowsConfigured,
301 final Map<FlowDescriptor, Flow> flowOperationalMap,
302 final boolean gatherUpdates) {
303 final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
304 // loop configured flows and check if already present on device
305 for (final Flow flow : flowsConfigured) {
306 final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
308 if (existingFlow == null) {
309 flowsSyncBox.getItemsToPush().add(flow);
311 // check instructions and eventually update
312 if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
313 flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow));
321 * Resolves flow differences in all tables.
323 * @param nodeId target node
324 * @param tableOperationalMap flow-tables resent on device
325 * @param tablesConfigured flow-tables configured for device
326 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
327 * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
329 public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
330 final Map<Uint8, Table> tableOperationalMap, final Collection<Table> tablesConfigured,
331 final boolean gatherUpdates) {
332 LOG.trace("resolving flows in tables for {}", nodeId.getValue());
333 final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
334 for (final Table tableConfigured : tablesConfigured) {
335 final Collection<Flow> flowsConfigured = tableConfigured.nonnullFlow().values();
336 if (flowsConfigured.isEmpty()) {
340 // lookup table (on device)
341 final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
342 // wrap existing (on device) flows in current table into map
343 final Map<FlowDescriptor, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
344 tableOperational != null
345 ? tableOperational.nonnullFlow().values()
349 final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
350 flowsConfigured, flowOperationalMap, gatherUpdates);
351 if (!flowsSyncBox.isEmpty()) {
352 tableFlowSyncBoxes.put(tableConfigured.key(), flowsSyncBox);
355 return tableFlowSyncBoxes;
358 public static Collection<Group> safeGroups(FlowCapableNode node) {
359 return node == null ? Collections.emptyList() : node.nonnullGroup().values();
362 public static Collection<Table> safeTables(FlowCapableNode node) {
363 return node == null ? Collections.emptyList() : node.nonnullTable().values();
366 public static Collection<Meter> safeMeters(FlowCapableNode node) {
367 return node == null ? Collections.emptyList() : node.nonnullMeter().values();