cb695b691a2aa1d5a9dc6408d3bcd5fe20742ac5
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / util / ReconcileUtil.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.util;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterables;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Objects;
26 import java.util.Set;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.common.RpcError;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * Util methods for group reconcil task (future chaining, transforms).
53  */
54 public final class ReconcileUtil {
55
56     private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
57
58     private ReconcileUtil() {
59         throw new IllegalStateException("This class should not be instantiated.");
60     }
61
62     /**
63      * Creates a single rpc result of type Void honoring all partial rpc results.
64      *
65      * @param previousItemAction description for case when the triggering future contains failure
66      * @param <D>                type of rpc output (gathered in list)
67      * @return single rpc result of type Void honoring all partial rpc results
68      */
69     public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(
70             final String previousItemAction) {
71         return input -> {
72             final RpcResultBuilder<Void> resultSink;
73             if (input != null) {
74                 List<RpcError> errors = new ArrayList<>();
75                 for (RpcResult<D> rpcResult : input) {
76                     if (!rpcResult.isSuccessful()) {
77                         errors.addAll(rpcResult.getErrors());
78                     }
79                 }
80                 if (errors.isEmpty()) {
81                     resultSink = RpcResultBuilder.success();
82                 } else {
83                     resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
84                 }
85             } else {
86                 resultSink = RpcResultBuilder.<Void>failed()
87                         .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
88             }
89             return resultSink.build();
90         };
91     }
92
93     /**
94      * Creates a single rpc result of type Void honoring all partial rpc results.
95      *
96      * @param actionDescription description for case when the triggering future contains failure
97      * @param <D>               type of rpc output (gathered in list)
98      * @return single rpc result of type Void honoring all partial rpc results
99      */
100     public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(
101             final String actionDescription) {
102         return input -> {
103             final RpcResultBuilder<Void> resultSink;
104             if (input != null) {
105                 List<RpcError> errors = new ArrayList<>();
106                 if (!input.isSuccessful()) {
107                     errors.addAll(input.getErrors());
108                     resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
109                 } else {
110                     resultSink = RpcResultBuilder.success();
111                 }
112             } else {
113                 resultSink = RpcResultBuilder.<Void>failed()
114                         .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
115             }
116             return resultSink.build();
117         };
118     }
119
120     /**
121      * Flushes a chain barrier.
122      *
123      * @param nodeIdent flow capable node path - target device for routed rpc
124      * @param flowCapableTransactionService barrier rpc service
125      * @return async barrier result
126      */
127     public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
128             final InstanceIdentifier<Node> nodeIdent,
129             final FlowCapableTransactionService flowCapableTransactionService) {
130         return input -> {
131             final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
132                     .setNode(new NodeRef(nodeIdent))
133                     .build();
134             ListenableFuture<RpcResult<SendBarrierOutput>> result
135                     = flowCapableTransactionService.sendBarrier(barrierInput);
136
137             return Futures.transformAsync(result, input1 -> {
138                 if (input1.isSuccessful()) {
139                     return Futures.<RpcResult<Void>>immediateFuture(RpcResultBuilder.<Void>success().build());
140                 } else {
141                     return Futures.<RpcResult<Void>>immediateFailedFuture(null);
142                 }
143             });
144         };
145     }
146
147     /**
148      * Returns a list of safe synchronization steps with updates.
149      *
150      * @param nodeId target node
151      * @param installedGroupsArg groups resent on device
152      * @param pendingGroups      groups configured for device
153      * @return list of safe synchronization steps with updates
154      */
155     public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
156                                                                       final Map<Long, Group> installedGroupsArg,
157                                                                       final Collection<Group> pendingGroups) {
158         return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true);
159     }
160
161     /**
162      * Returns a list of safe synchronization steps.
163      *
164      * @param nodeId             target node
165      * @param installedGroupsArg groups resent on device
166      * @param pendingGroups      groups configured for device
167      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
168      * @return list of safe synchronization steps
169      */
170     public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
171                                                                       final Map<Long, Group> installedGroupsArg,
172                                                                       final Collection<Group> pendingGroups,
173                                                                       final boolean gatherUpdates) {
174         final Map<Long, Group> installedGroups = new HashMap<>(installedGroupsArg);
175         final List<ItemSyncBox<Group>> plan = new ArrayList<>();
176
177         while (!Iterables.isEmpty(pendingGroups)) {
178             final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
179             final Iterator<Group> iterator = pendingGroups.iterator();
180             final Map<Long, Group> installIncrement = new HashMap<>();
181
182             while (iterator.hasNext()) {
183                 final Group group = iterator.next();
184
185                 final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
186                 if (existingGroup != null) {
187                     if (!gatherUpdates) {
188                         iterator.remove();
189                     } else {
190                         // check buckets and eventually update
191                         if (group.equals(existingGroup)) {
192                             iterator.remove();
193                         } else {
194                             if (checkGroupPrecondition(installedGroups.keySet(), group)) {
195                                 iterator.remove();
196                                 LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(),
197                                         nodeId);
198                                 stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup,
199                                         group));
200                             }
201                         }
202                     }
203                 } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
204                     iterator.remove();
205                     installIncrement.put(group.getGroupId().getValue(), group);
206                     stepPlan.getItemsToPush().add(group);
207                 }
208             }
209
210             if (!stepPlan.isEmpty()) {
211                 // atomic update of installed flows in order to keep plan portions clean of local group dependencies
212                 installedGroups.putAll(installIncrement);
213                 plan.add(stepPlan);
214             } else if (!pendingGroups.isEmpty()) {
215                 LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, "
216                         + "resolving stuck at level {}", nodeId.getValue(), plan.size());
217                 throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
218             }
219         }
220
221         return plan;
222     }
223
224     public static boolean checkGroupPrecondition(final Set<Long> installedGroupIds, final Group pendingGroup) {
225         boolean okToInstall = true;
226         // check each bucket in the pending group
227         for (Bucket bucket : pendingGroup.getBuckets().getBucket()) {
228             for (Action action : bucket.getAction()) {
229                 // if the output action is a group
230                 if (GroupActionCase.class.equals(action.getAction().getImplementedInterface())) {
231                     Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
232                     // see if that output group is installed
233                     if (!installedGroupIds.contains(groupId)) {
234                         // if not installed, we have missing dependencies and cannot install this pending group
235                         okToInstall = false;
236                         break;
237                     }
238                 }
239             }
240             if (!okToInstall) {
241                 break;
242             }
243         }
244         return okToInstall;
245     }
246
247     public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
248         int count = 0;
249         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
250             count += groupItemSyncBox.getItemsToPush().size();
251         }
252         return count;
253     }
254
255     public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
256         int count = 0;
257         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
258             count += groupItemSyncBox.getItemsToUpdate().size();
259         }
260         return count;
261     }
262
263     /**
264      * Resolves meter differences.
265      *
266      * @param nodeId              target node
267      * @param meterOperationalMap meters present on device
268      * @param metersConfigured    meters configured for device
269      * @param gatherUpdates       check content of pending item if present on device (and create update task eventually)
270      * @return synchronization box
271      */
272     public static ItemSyncBox<Meter> resolveMeterDiffs(final NodeId nodeId,
273                                                        final Map<MeterId, Meter> meterOperationalMap,
274                                                        final List<Meter> metersConfigured,
275                                                        final boolean gatherUpdates) {
276         LOG.trace("resolving meters for {}", nodeId.getValue());
277         final ItemSyncBox<Meter> syncBox = new ItemSyncBox<>();
278         for (Meter meter : metersConfigured) {
279             final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
280             if (existingMeter == null) {
281                 syncBox.getItemsToPush().add(meter);
282             } else {
283                 // compare content and eventually update
284                 if (gatherUpdates && !meter.equals(existingMeter)) {
285                     syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter));
286                 }
287             }
288         }
289         return syncBox;
290     }
291
292     /**
293      * Resolves flow differences in a table.
294      *
295      * @param flowsConfigured    flows resent on device
296      * @param flowOperationalMap flows configured for device
297      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
298      * @return list of safe synchronization steps
299      */
300     private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
301                                                             final Map<FlowDescriptor, Flow> flowOperationalMap,
302                                                             final boolean gatherUpdates) {
303         final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
304         // loop configured flows and check if already present on device
305         for (final Flow flow : flowsConfigured) {
306             final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
307
308             if (existingFlow == null) {
309                 flowsSyncBox.getItemsToPush().add(flow);
310             } else {
311                 // check instructions and eventually update
312                 if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
313                     flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow));
314                 }
315             }
316         }
317         return flowsSyncBox;
318     }
319
320     /**
321      * Resolves flow differences in all tables.
322      *
323      * @param nodeId              target node
324      * @param tableOperationalMap flow-tables resent on device
325      * @param tablesConfigured    flow-tables configured for device
326      * @param gatherUpdates       check content of pending item if present on device (and create update task eventually)
327      * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
328      */
329     public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
330             final Map<Short, Table> tableOperationalMap, final List<Table> tablesConfigured,
331             final boolean gatherUpdates) {
332         LOG.trace("resolving flows in tables for {}", nodeId.getValue());
333         final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
334         for (final Table tableConfigured : tablesConfigured) {
335             final List<Flow> flowsConfigured = tableConfigured.getFlow();
336             if (flowsConfigured == null || flowsConfigured.isEmpty()) {
337                 continue;
338             }
339
340             // lookup table (on device)
341             final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
342             // wrap existing (on device) flows in current table into map
343             final Map<FlowDescriptor, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
344                     tableOperational != null
345                             ? tableOperational.getFlow()
346                             : null);
347
348
349             final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
350                     flowsConfigured, flowOperationalMap, gatherUpdates);
351             if (!flowsSyncBox.isEmpty()) {
352                 tableFlowSyncBoxes.put(tableConfigured.getKey(), flowsSyncBox);
353             }
354         }
355         return tableFlowSyncBoxes;
356     }
357
358     public static List<Group> safeGroups(FlowCapableNode node) {
359         if (node == null) {
360             return Collections.emptyList();
361         }
362
363         return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.<Group>of());
364     }
365
366     public static List<Table> safeTables(FlowCapableNode node) {
367         if (node == null) {
368             return Collections.emptyList();
369         }
370
371         return MoreObjects.firstNonNull(node.getTable(), ImmutableList.<Table>of());
372     }
373
374     public static List<Meter> safeMeters(FlowCapableNode node) {
375         if (node == null) {
376             return Collections.emptyList();
377         }
378
379         return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.<Meter>of());
380     }
381 }