2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.util;
11 import com.google.common.base.Function;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterables;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.Objects;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Util methods for group reconcil task (future chaining, transforms).
55 public final class ReconcileUtil {
57 private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
59 private ReconcileUtil() {
60 throw new IllegalStateException("This class should not be instantiated.");
64 * Creates a single rpc result of type Void honoring all partial rpc results.
66 * @param previousItemAction description for case when the triggering future contains failure
67 * @param <D> type of rpc output (gathered in list)
68 * @return single rpc result of type Void honoring all partial rpc results
70 public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(
71 final String previousItemAction) {
73 final RpcResultBuilder<Void> resultSink;
75 List<RpcError> errors = new ArrayList<>();
76 for (RpcResult<D> rpcResult : input) {
77 if (!rpcResult.isSuccessful()) {
78 errors.addAll(rpcResult.getErrors());
81 if (errors.isEmpty()) {
82 resultSink = RpcResultBuilder.success();
84 resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
87 resultSink = RpcResultBuilder.<Void>failed()
88 .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
90 return resultSink.build();
95 * Creates a single rpc result of type Void honoring all partial rpc results.
97 * @param actionDescription description for case when the triggering future contains failure
98 * @param <D> type of rpc output (gathered in list)
99 * @return single rpc result of type Void honoring all partial rpc results
101 public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(
102 final String actionDescription) {
104 final RpcResultBuilder<Void> resultSink;
106 List<RpcError> errors = new ArrayList<>();
107 if (!input.isSuccessful()) {
108 errors.addAll(input.getErrors());
109 resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
111 resultSink = RpcResultBuilder.success();
114 resultSink = RpcResultBuilder.<Void>failed()
115 .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
117 return resultSink.build();
122 * Flushes a chain barrier.
124 * @param nodeIdent flow capable node path - target device for routed rpc
125 * @param flowCapableTransactionService barrier rpc service
126 * @return async barrier result
128 public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
129 final InstanceIdentifier<Node> nodeIdent,
130 final FlowCapableTransactionService flowCapableTransactionService) {
132 final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
133 .setNode(new NodeRef(nodeIdent))
135 ListenableFuture<RpcResult<SendBarrierOutput>> result
136 = flowCapableTransactionService.sendBarrier(barrierInput);
138 return Futures.transformAsync(result, input1 -> {
139 if (input1.isSuccessful()) {
140 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
142 return Futures.immediateFailedFuture(null);
144 }, MoreExecutors.directExecutor());
149 * Returns a list of safe synchronization steps with updates.
151 * @param nodeId target node
152 * @param installedGroupsArg groups resent on device
153 * @param pendingGroups groups configured for device
154 * @return list of safe synchronization steps with updates
156 public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
157 final Map<Long, Group> installedGroupsArg,
158 final Collection<Group> pendingGroups) {
159 return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true);
163 * Returns a list of safe synchronization steps.
165 * @param nodeId target node
166 * @param installedGroupsArg groups resent on device
167 * @param pendingGroups groups configured for device
168 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
169 * @return list of safe synchronization steps
171 public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
172 final Map<Long, Group> installedGroupsArg,
173 final Collection<Group> pendingGroups,
174 final boolean gatherUpdates) {
175 final Map<Long, Group> installedGroups = new HashMap<>(installedGroupsArg);
176 final List<ItemSyncBox<Group>> plan = new ArrayList<>();
178 while (!Iterables.isEmpty(pendingGroups)) {
179 final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
180 final Iterator<Group> iterator = pendingGroups.iterator();
181 final Map<Long, Group> installIncrement = new HashMap<>();
183 while (iterator.hasNext()) {
184 final Group group = iterator.next();
186 final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
187 if (existingGroup != null) {
188 if (!gatherUpdates) {
191 // check buckets and eventually update
192 if (group.equals(existingGroup)) {
195 if (checkGroupPrecondition(installedGroups.keySet(), group)) {
197 LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(),
199 stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup,
204 } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
206 installIncrement.put(group.getGroupId().getValue(), group);
207 stepPlan.getItemsToPush().add(group);
211 if (!stepPlan.isEmpty()) {
212 // atomic update of installed flows in order to keep plan portions clean of local group dependencies
213 installedGroups.putAll(installIncrement);
215 } else if (!pendingGroups.isEmpty()) {
216 LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, "
217 + "resolving stuck at level {}", nodeId.getValue(), plan.size());
218 throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
225 public static boolean checkGroupPrecondition(final Set<Long> installedGroupIds, final Group pendingGroup) {
226 boolean okToInstall = true;
227 // check each bucket in the pending group
228 for (Bucket bucket : pendingGroup.getBuckets().getBucket()) {
229 for (Action action : bucket.getAction()) {
230 // if the output action is a group
231 if (GroupActionCase.class.equals(action.getAction().implementedInterface())) {
232 Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
233 // see if that output group is installed
234 if (!installedGroupIds.contains(groupId)) {
235 // if not installed, we have missing dependencies and cannot install this pending group
248 public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
250 for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
251 count += groupItemSyncBox.getItemsToPush().size();
256 public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
258 for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
259 count += groupItemSyncBox.getItemsToUpdate().size();
265 * Resolves meter differences.
267 * @param nodeId target node
268 * @param meterOperationalMap meters present on device
269 * @param metersConfigured meters configured for device
270 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
271 * @return synchronization box
273 public static ItemSyncBox<Meter> resolveMeterDiffs(final NodeId nodeId,
274 final Map<MeterId, Meter> meterOperationalMap,
275 final List<Meter> metersConfigured,
276 final boolean gatherUpdates) {
277 LOG.trace("resolving meters for {}", nodeId.getValue());
278 final ItemSyncBox<Meter> syncBox = new ItemSyncBox<>();
279 for (Meter meter : metersConfigured) {
280 final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
281 if (existingMeter == null) {
282 syncBox.getItemsToPush().add(meter);
284 // compare content and eventually update
285 if (gatherUpdates && !meter.equals(existingMeter)) {
286 syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter));
294 * Resolves flow differences in a table.
296 * @param flowsConfigured flows resent on device
297 * @param flowOperationalMap flows configured for device
298 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
299 * @return list of safe synchronization steps
301 private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
302 final Map<FlowDescriptor, Flow> flowOperationalMap,
303 final boolean gatherUpdates) {
304 final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
305 // loop configured flows and check if already present on device
306 for (final Flow flow : flowsConfigured) {
307 final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
309 if (existingFlow == null) {
310 flowsSyncBox.getItemsToPush().add(flow);
312 // check instructions and eventually update
313 if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
314 flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow));
322 * Resolves flow differences in all tables.
324 * @param nodeId target node
325 * @param tableOperationalMap flow-tables resent on device
326 * @param tablesConfigured flow-tables configured for device
327 * @param gatherUpdates check content of pending item if present on device (and create update task eventually)
328 * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
330 public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
331 final Map<Short, Table> tableOperationalMap, final List<Table> tablesConfigured,
332 final boolean gatherUpdates) {
333 LOG.trace("resolving flows in tables for {}", nodeId.getValue());
334 final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
335 for (final Table tableConfigured : tablesConfigured) {
336 final List<Flow> flowsConfigured = tableConfigured.getFlow();
337 if (flowsConfigured == null || flowsConfigured.isEmpty()) {
341 // lookup table (on device)
342 final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
343 // wrap existing (on device) flows in current table into map
344 final Map<FlowDescriptor, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
345 tableOperational != null
346 ? tableOperational.getFlow()
350 final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
351 flowsConfigured, flowOperationalMap, gatherUpdates);
352 if (!flowsSyncBox.isEmpty()) {
353 tableFlowSyncBoxes.put(tableConfigured.key(), flowsSyncBox);
356 return tableFlowSyncBoxes;
359 public static List<Group> safeGroups(FlowCapableNode node) {
361 return Collections.emptyList();
364 return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.of());
367 public static List<Table> safeTables(FlowCapableNode node) {
369 return Collections.emptyList();
372 return MoreObjects.firstNonNull(node.getTable(), ImmutableList.of());
375 public static List<Meter> safeMeters(FlowCapableNode node) {
377 return Collections.emptyList();
380 return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.of());