ab0ad75f2d45a1f6f4de91970409d75e703d3f5d
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / util / ReconcileUtil.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.util;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterables;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Set;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.opendaylight.yangtools.yang.common.RpcError;
44 import org.opendaylight.yangtools.yang.common.RpcResult;
45 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Util methods for group reconcil task (future chaining, transforms).
51  */
52 public final class ReconcileUtil {
53
54     private static final Logger LOG = LoggerFactory.getLogger(ReconcileUtil.class);
55
56     private ReconcileUtil() {
57         throw new IllegalStateException("This class should not be instantiated.");
58     }
59
60     /**
61      * Creates a single rpc result of type Void honoring all partial rpc results.
62      *
63      * @param previousItemAction description for case when the triggering future contains failure
64      * @param <D>                type of rpc output (gathered in list)
65      * @return single rpc result of type Void honoring all partial rpc results
66      */
67     public static <D> Function<List<RpcResult<D>>, RpcResult<Void>> createRpcResultCondenser(
68             final String previousItemAction) {
69         return input -> {
70             final RpcResultBuilder<Void> resultSink;
71             if (input != null) {
72                 List<RpcError> errors = new ArrayList<>();
73                 for (RpcResult<D> rpcResult : input) {
74                     if (!rpcResult.isSuccessful()) {
75                         errors.addAll(rpcResult.getErrors());
76                     }
77                 }
78                 if (errors.isEmpty()) {
79                     resultSink = RpcResultBuilder.success();
80                 } else {
81                     resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
82                 }
83             } else {
84                 resultSink = RpcResultBuilder.<Void>failed()
85                         .withError(RpcError.ErrorType.APPLICATION, "previous " + previousItemAction + " failed");
86             }
87             return resultSink.build();
88         };
89     }
90
91     /**
92      * Creates a single rpc result of type Void honoring all partial rpc results.
93      *
94      * @param actionDescription description for case when the triggering future contains failure
95      * @param <D>               type of rpc output (gathered in list)
96      * @return single rpc result of type Void honoring all partial rpc results
97      */
98     public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(
99             final String actionDescription) {
100         return input -> {
101             final RpcResultBuilder<Void> resultSink;
102             if (input != null) {
103                 List<RpcError> errors = new ArrayList<>();
104                 if (!input.isSuccessful()) {
105                     errors.addAll(input.getErrors());
106                     resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
107                 } else {
108                     resultSink = RpcResultBuilder.success();
109                 }
110             } else {
111                 resultSink = RpcResultBuilder.<Void>failed()
112                         .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
113             }
114             return resultSink.build();
115         };
116     }
117
118     /**
119      * Flushes a chain barrier.
120      *
121      * @param nodeIdent flow capable node path - target device for routed rpc
122      * @param flowCapableTransactionService barrier rpc service
123      * @return async barrier result
124      */
125     public static AsyncFunction<RpcResult<Void>, RpcResult<Void>> chainBarrierFlush(
126             final InstanceIdentifier<Node> nodeIdent,
127             final FlowCapableTransactionService flowCapableTransactionService) {
128         return input -> {
129             final SendBarrierInput barrierInput = new SendBarrierInputBuilder()
130                     .setNode(new NodeRef(nodeIdent))
131                     .build();
132             return JdkFutureAdapters.listenInPoolThread(flowCapableTransactionService.sendBarrier(barrierInput));
133         };
134     }
135
136     /**
137      * Returns a list of safe synchronization steps with updates.
138      *
139      * @param nodeId target node
140      * @param installedGroupsArg groups resent on device
141      * @param pendingGroups      groups configured for device
142      * @return list of safe synchronization steps with updates
143      */
144     public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
145                                                                       final Map<Long, Group> installedGroupsArg,
146                                                                       final Collection<Group> pendingGroups) {
147         return resolveAndDivideGroupDiffs(nodeId, installedGroupsArg, pendingGroups, true);
148     }
149
150     /**
151      * Returns a list of safe synchronization steps.
152      *
153      * @param nodeId             target node
154      * @param installedGroupsArg groups resent on device
155      * @param pendingGroups      groups configured for device
156      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
157      * @return list of safe synchronization steps
158      */
159     public static List<ItemSyncBox<Group>> resolveAndDivideGroupDiffs(final NodeId nodeId,
160                                                                       final Map<Long, Group> installedGroupsArg,
161                                                                       final Collection<Group> pendingGroups,
162                                                                       final boolean gatherUpdates) {
163         final Map<Long, Group> installedGroups = new HashMap<>(installedGroupsArg);
164         final List<ItemSyncBox<Group>> plan = new ArrayList<>();
165
166         while (!Iterables.isEmpty(pendingGroups)) {
167             final ItemSyncBox<Group> stepPlan = new ItemSyncBox<>();
168             final Iterator<Group> iterator = pendingGroups.iterator();
169             final Map<Long, Group> installIncrement = new HashMap<>();
170
171             while (iterator.hasNext()) {
172                 final Group group = iterator.next();
173
174                 final Group existingGroup = installedGroups.get(group.getGroupId().getValue());
175                 if (existingGroup != null) {
176                     if (!gatherUpdates) {
177                         iterator.remove();
178                     } else {
179                         // check buckets and eventually update
180                         if (group.equals(existingGroup)) {
181                             iterator.remove();
182                         } else {
183                             if (checkGroupPrecondition(installedGroups.keySet(), group)) {
184                                 iterator.remove();
185                                 LOG.trace("Group {} on device {} differs - planned for update", group.getGroupId(),
186                                         nodeId);
187                                 stepPlan.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingGroup,
188                                         group));
189                             }
190                         }
191                     }
192                 } else if (checkGroupPrecondition(installedGroups.keySet(), group)) {
193                     iterator.remove();
194                     installIncrement.put(group.getGroupId().getValue(), group);
195                     stepPlan.getItemsToPush().add(group);
196                 }
197             }
198
199             if (!stepPlan.isEmpty()) {
200                 // atomic update of installed flows in order to keep plan portions clean of local group dependencies
201                 installedGroups.putAll(installIncrement);
202                 plan.add(stepPlan);
203             } else if (!pendingGroups.isEmpty()) {
204                 LOG.warn("Failed to resolve and divide groups into preconditions-match based ordered plan: {}, "
205                         + "resolving stuck at level {}", nodeId.getValue(), plan.size());
206                 throw new IllegalStateException("Failed to resolve and divide groups when matching preconditions");
207             }
208         }
209
210         return plan;
211     }
212
213     public static boolean checkGroupPrecondition(final Set<Long> installedGroupIds, final Group pendingGroup) {
214         boolean okToInstall = true;
215         // check each bucket in the pending group
216         for (Bucket bucket : pendingGroup.getBuckets().getBucket()) {
217             for (Action action : bucket.getAction()) {
218                 // if the output action is a group
219                 if (GroupActionCase.class.equals(action.getAction().getImplementedInterface())) {
220                     Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
221                     // see if that output group is installed
222                     if (!installedGroupIds.contains(groupId)) {
223                         // if not installed, we have missing dependencies and cannot install this pending group
224                         okToInstall = false;
225                         break;
226                     }
227                 }
228             }
229             if (!okToInstall) {
230                 break;
231             }
232         }
233         return okToInstall;
234     }
235
236     public static <E> int countTotalPushed(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
237         int count = 0;
238         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
239             count += groupItemSyncBox.getItemsToPush().size();
240         }
241         return count;
242     }
243
244     public static <E> int countTotalUpdated(final Iterable<ItemSyncBox<E>> groupsAddPlan) {
245         int count = 0;
246         for (ItemSyncBox<E> groupItemSyncBox : groupsAddPlan) {
247             count += groupItemSyncBox.getItemsToUpdate().size();
248         }
249         return count;
250     }
251
252     /**
253      * Resolves meter differences.
254      *
255      * @param nodeId              target node
256      * @param meterOperationalMap meters present on device
257      * @param metersConfigured    meters configured for device
258      * @param gatherUpdates       check content of pending item if present on device (and create update task eventually)
259      * @return synchronization box
260      */
261     public static ItemSyncBox<Meter> resolveMeterDiffs(final NodeId nodeId,
262                                                        final Map<MeterId, Meter> meterOperationalMap,
263                                                        final List<Meter> metersConfigured,
264                                                        final boolean gatherUpdates) {
265         LOG.trace("resolving meters for {}", nodeId.getValue());
266         final ItemSyncBox<Meter> syncBox = new ItemSyncBox<>();
267         for (Meter meter : metersConfigured) {
268             final Meter existingMeter = meterOperationalMap.get(meter.getMeterId());
269             if (existingMeter == null) {
270                 syncBox.getItemsToPush().add(meter);
271             } else {
272                 // compare content and eventually update
273                 if (gatherUpdates && !meter.equals(existingMeter)) {
274                     syncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingMeter, meter));
275                 }
276             }
277         }
278         return syncBox;
279     }
280
281     /**
282      * Resolves flow differences in a table.
283      *
284      * @param flowsConfigured    flows resent on device
285      * @param flowOperationalMap flows configured for device
286      * @param gatherUpdates      check content of pending item if present on device (and create update task eventually)
287      * @return list of safe synchronization steps
288      */
289     private static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
290                                                             final Map<FlowDescriptor, Flow> flowOperationalMap,
291                                                             final boolean gatherUpdates) {
292         final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
293         // loop configured flows and check if already present on device
294         for (final Flow flow : flowsConfigured) {
295             final Flow existingFlow = FlowCapableNodeLookups.flowMapLookupExisting(flow, flowOperationalMap);
296
297             if (existingFlow == null) {
298                 flowsSyncBox.getItemsToPush().add(flow);
299             } else {
300                 // check instructions and eventually update
301                 if (gatherUpdates && !Objects.equals(flow.getInstructions(), existingFlow.getInstructions())) {
302                     flowsSyncBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(existingFlow, flow));
303                 }
304             }
305         }
306         return flowsSyncBox;
307     }
308
309     /**
310      * Resolves flow differences in all tables.
311      *
312      * @param nodeId              target node
313      * @param tableOperationalMap flow-tables resent on device
314      * @param tablesConfigured    flow-tables configured for device
315      * @param gatherUpdates       check content of pending item if present on device (and create update task eventually)
316      * @return map : key={@link TableKey}, value={@link ItemSyncBox} of safe synchronization steps
317      */
318     public static Map<TableKey, ItemSyncBox<Flow>> resolveFlowDiffsInAllTables(final NodeId nodeId,
319             final Map<Short, Table> tableOperationalMap, final List<Table> tablesConfigured,
320             final boolean gatherUpdates) {
321         LOG.trace("resolving flows in tables for {}", nodeId.getValue());
322         final Map<TableKey, ItemSyncBox<Flow>> tableFlowSyncBoxes = new HashMap<>();
323         for (final Table tableConfigured : tablesConfigured) {
324             final List<Flow> flowsConfigured = tableConfigured.getFlow();
325             if (flowsConfigured == null || flowsConfigured.isEmpty()) {
326                 continue;
327             }
328
329             // lookup table (on device)
330             final Table tableOperational = tableOperationalMap.get(tableConfigured.getId());
331             // wrap existing (on device) flows in current table into map
332             final Map<FlowDescriptor, Flow> flowOperationalMap = FlowCapableNodeLookups.wrapFlowsToMap(
333                     tableOperational != null
334                             ? tableOperational.getFlow()
335                             : null);
336
337
338             final ItemSyncBox<Flow> flowsSyncBox = resolveFlowDiffsInTable(
339                     flowsConfigured, flowOperationalMap, gatherUpdates);
340             if (!flowsSyncBox.isEmpty()) {
341                 tableFlowSyncBoxes.put(tableConfigured.getKey(), flowsSyncBox);
342             }
343         }
344         return tableFlowSyncBoxes;
345     }
346
347     public static List<Group> safeGroups(FlowCapableNode node) {
348         if (node == null) {
349             return Collections.emptyList();
350         }
351
352         return MoreObjects.firstNonNull(node.getGroup(), ImmutableList.<Group>of());
353     }
354
355     public static List<Table> safeTables(FlowCapableNode node) {
356         if (node == null) {
357             return Collections.emptyList();
358         }
359
360         return MoreObjects.firstNonNull(node.getTable(), ImmutableList.<Table>of());
361     }
362
363     public static List<Meter> safeMeters(FlowCapableNode node) {
364         if (node == null) {
365             return Collections.emptyList();
366         }
367
368         return MoreObjects.firstNonNull(node.getMeter(), ImmutableList.<Meter>of());
369     }
370 }