Merge "OPNFLWPLUG-1062 Include additional LLDP fields in liblldp"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / util / ReconcileUtil.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.util;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterables;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Util methods for group reconcil task (future chaining, transforms).
54  */
55 public final class ReconcileUtil {
56
57     private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
58
59     private ReconcileUtil() {
60         throw new IllegalStateException("This class should not be instantiated.");
61     }
62
63     /**
64      * Creates a single rpc result of type Void honoring all partial rpc results.
65      *
66      * @param previousItemAction description for case when the triggering future contains failure
67      * @param <D>                type of rpc output (gathered in list)
68      * @return single rpc result of type Void honoring all partial rpc results
69      */
70     public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(
71             final String previousItemAction) {
72         return input -> {
73             final RpcResultBuilder<Void> resultSink;
74             if (input != null) {
75                 List<RpcError> errors = new ArrayList<>();
76                 for (RpcResult<D> rpcResult : input) {
77                     if (!rpcResult.isSuccessful()) {
78                         errors.addAll(rpcResult.getErrors());
79                     }
80                 }
81                 if (errors.isEmpty()) {
82                     resultSink = RpcResultBuilder.success();
83                 } else {
84                     resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
85                 }
86             } else {
87                 resultSink = RpcResultBuilder.<Void>failed()
88                         .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
89             }
90             return resultSink.build();
91         };
92     }
93
94     /**
95      * Creates a single rpc result of type Void honoring all partial rpc results.
96      *
97      * @param actionDescription description for case when the triggering future contains failure
98      * @param <D>               type of rpc output (gathered in list)
99      * @return single rpc result of type Void honoring all partial rpc results
100      */
101     public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(
102             final String actionDescription) {
103         return input -> {
104             final RpcResultBuilder<Void> resultSink;
105             if (input != null) {
106                 List<RpcError> errors = new ArrayList<>();
107                 if (!input.isSuccessful()) {
108                     errors.addAll(input.getErrors());
109                     resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
110                 } else {
111                     resultSink = RpcResultBuilder.success();
112                 }
113             } else {
114                 resultSink = RpcResultBuilder.<Void>failed()
115                         .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
116             }
117             return resultSink.build();
118         };
119     }
120
121     /**
122      * Flushes a chain barrier.
123      *
124      * @param nodeIdent flow capable node path - target device for routed rpc
125      * @param flowCapableTransactionService barrier rpc service
126      * @return async barrier result
127      */
128     public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
129             final InstanceIdentifier<Node> nodeIdent,
130             final FlowCapableTransactionService flowCapableTransactionService) {
131         return input -> {
132             final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
133                     .setNode(new NodeRef(nodeIdent))
134                     .build();
135             ListenableFuture<RpcResult<SendBarrierOutput>> result
136                     = flowCapableTransactionService.sendBarrier(barrierInput);
137
138             return Futures.transformAsync(result, input1 -> {
139                 if (input1.isSuccessful()) {
140                     return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
141                 } else {
142                     return Futures.immediateFailedFuture(null);
143                 }
144             }, MoreExecutors.directExecutor());
145         };
146     }
147
148     /**
149      * Returns a list of safe synchronization steps with updates.
150      *
151      * @param nodeId target node
152      * @param installedGroupsArg groups resent on device
153      * @param pendingGroups      groups configured for device
154      * @return list of safe synchronization steps with updates
155      */
156     public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
157                                                                       final Map<Long, Group> installedGroupsArg,
158                                                                       final Collection<Group> pendingGroups) {
159         return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true);
160     }
161
162     /**
163      * Returns a list of safe synchronization steps.
164      *
165      * @param nodeId             target node
166      * @param installedGroupsArg groups resent on device
167      * @param pendingGroups      groups configured for device
168      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
169      * @return list of safe synchronization steps
170      */
171     public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
172                                                                       final Map<Long, Group> installedGroupsArg,
173                                                                       final Collection<Group> pendingGroups,
174                                                                       final boolean gatherUpdates) {
175         final Map<Long, Group> installedGroups = new HashMap<>(installedGroupsArg);
176         final List<ItemSyncBox<Group>> plan = new ArrayList<>();
177
178         while (!Iterables.isEmpty(pendingGroups)) {
179             final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
180             final Iterator<Group> iterator = pendingGroups.iterator();
181             final Map<Long, Group> installIncrement = new HashMap<>();
182
183             while (iterator.hasNext()) {
184                 final Group group = iterator.next();
185
186                 final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
187                 if (existingGroup != null) {
188                     if (!gatherUpdates) {
189                         iterator.remove();
190                     } else {
191                         // check buckets and eventually update
192                         if (group.equals(existingGroup)) {
193                             iterator.remove();
194                         } else {
195                             if (checkGroupPrecondition(installedGroups.keySet(), group)) {
196                                 iterator.remove();
197                                 LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(),
198                                         nodeId);
199                                 stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup,
200                                         group));
201                             }
202                         }
203                     }
204                 } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
205                     iterator.remove();
206                     installIncrement.put(group.getGroupId().getValue(), group);
207                     stepPlan.getItemsToPush().add(group);
208                 }
209             }
210
211             if (!stepPlan.isEmpty()) {
212                 // atomic update of installed flows in order to keep plan portions clean of local group dependencies
213                 installedGroups.putAll(installIncrement);
214                 plan.add(stepPlan);
215             } else if (!pendingGroups.isEmpty()) {
216                 LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, "
217                         + "resolving stuck at level {}", nodeId.getValue(), plan.size());
218                 throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
219             }
220         }
221
222         return plan;
223     }
224
225     public static boolean checkGroupPrecondition(final Set<Long> installedGroupIds, final Group pendingGroup) {
226         boolean okToInstall = true;
227         // check each bucket in the pending group
228         for (Bucket bucket : pendingGroup.getBuckets().getBucket()) {
229             for (Action action : bucket.getAction()) {
230                 // if the output action is a group
231                 if (GroupActionCase.class.equals(action.getAction().implementedInterface())) {
232                     Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
233                     // see if that output group is installed
234                     if (!installedGroupIds.contains(groupId)) {
235                         // if not installed, we have missing dependencies and cannot install this pending group
236                         okToInstall = false;
237                         break;
238                     }
239                 }
240             }
241             if (!okToInstall) {
242                 break;
243             }
244         }
245         return okToInstall;
246     }
247
248     public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
249         int count = 0;
250         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
251             count += groupItemSyncBox.getItemsToPush().size();
252         }
253         return count;
254     }
255
256     public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
257         int count = 0;
258         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
259             count += groupItemSyncBox.getItemsToUpdate().size();
260         }
261         return count;
262     }
263
264     /**
265      * Resolves meter differences.
266      *
267      * @param nodeId              target node
268      * @param meterOperationalMap meters present on device
269      * @param metersConfigured    meters configured for device
270      * @param gatherUpdates       check content of pending item if present on device (and create update task eventually)
271      * @return synchronization box
272      */
273     public static ItemSyncBox<Meter> resolveMeterDiffs(final NodeId nodeId,
274                                                        final Map<MeterId, Meter> meterOperationalMap,
275                                                        final List<Meter> metersConfigured,
276                                                        final boolean gatherUpdates) {
277         LOG.trace("resolving meters for {}", nodeId.getValue());
278         final ItemSyncBox<Meter> syncBox = new ItemSyncBox<>();
279         for (Meter meter : metersConfigured) {
280             final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
281             if (existingMeter == null) {
282                 syncBox.getItemsToPush().add(meter);
283             } else {
284                 // compare content and eventually update
285                 if (gatherUpdates && !meter.equals(existingMeter)) {
286                     syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter));
287                 }
288             }
289         }
290         return syncBox;
291     }
292
293     /**
294      * Resolves flow differences in a table.
295      *
296      * @param flowsConfigured    flows resent on device
297      * @param flowOperationalMap flows configured for device
298      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
299      * @return list of safe synchronization steps
300      */
301     private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
302                                                             final Map<FlowDescriptor, Flow> flowOperationalMap,
303                                                             final boolean gatherUpdates) {
304         final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
305         // loop configured flows and check if already present on device
306         for (final Flow flow : flowsConfigured) {
307             final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
308
309             if (existingFlow == null) {
310                 flowsSyncBox.getItemsToPush().add(flow);
311             } else {
312                 // check instructions and eventually update
313                 if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
314                     flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow));
315                 }
316             }
317         }
318         return flowsSyncBox;
319     }
320
321     /**
322      * Resolves flow differences in all tables.
323      *
324      * @param nodeId              target node
325      * @param tableOperationalMap flow-tables resent on device
326      * @param tablesConfigured    flow-tables configured for device
327      * @param gatherUpdates       check content of pending item if present on device (and create update task eventually)
328      * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
329      */
330     public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
331             final Map<Short, Table> tableOperationalMap, final List<Table> tablesConfigured,
332             final boolean gatherUpdates) {
333         LOG.trace("resolving flows in tables for {}", nodeId.getValue());
334         final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
335         for (final Table tableConfigured : tablesConfigured) {
336             final List<Flow> flowsConfigured = tableConfigured.getFlow();
337             if (flowsConfigured == null || flowsConfigured.isEmpty()) {
338                 continue;
339             }
340
341             // lookup table (on device)
342             final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
343             // wrap existing (on device) flows in current table into map
344             final Map<FlowDescriptor, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
345                     tableOperational != null
346                             ? tableOperational.getFlow()
347                             : null);
348
349
350             final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
351                     flowsConfigured, flowOperationalMap, gatherUpdates);
352             if (!flowsSyncBox.isEmpty()) {
353                 tableFlowSyncBoxes.put(tableConfigured.key(), flowsSyncBox);
354             }
355         }
356         return tableFlowSyncBoxes;
357     }
358
359     public static List<Group> safeGroups(FlowCapableNode node) {
360         if (node == null) {
361             return Collections.emptyList();
362         }
363
364         return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.of());
365     }
366
367     public static List<Table> safeTables(FlowCapableNode node) {
368         if (node == null) {
369             return Collections.emptyList();
370         }
371
372         return MoreObjects.firstNonNull(node.getTable(), ImmutableList.of());
373     }
374
375     public static List<Meter> safeMeters(FlowCapableNode node) {
376         if (node == null) {
377             return Collections.emptyList();
378         }
379
380         return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.of());
381     }
382 }