Bump MRI upstreams
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
9
10 import com.google.common.collect.Iterables;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.Map;
18 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
19 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
20 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
21 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
22 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
24 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.common.ErrorType;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
56  */
57 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
58
59     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
60
61     private FlowForwarder flowForwarder;
62     private MeterForwarder meterForwarder;
63     private GroupForwarder groupForwarder;
64     private TableForwarder tableForwarder;
65     private FlowCapableTransactionService transactionService;
66
67     @Override
68     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
69                                                                  final SynchronizationDiffInput diffInput,
70                                                                  final SyncCrudCounters counters) {
71         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
72         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
73
74         /* Tables - have to be pushed before groups */
75         // TODO enable table-update when ready
76         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
77
78         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
79             // if (!input.isSuccessful()) {
80                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
81                 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
82                 //        Futures.asList Arrays.asList(input, output),
83                 //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
84             // }
85             return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
86         }, MoreExecutors.directExecutor());
87         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
88                 MoreExecutors.directExecutor());
89         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
90             // if (!input.isSuccessful()) {
91                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
92             // }
93             return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
94         }, MoreExecutors.directExecutor());
95         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
96                 MoreExecutors.directExecutor());
97         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
98             // if (!input.isSuccessful()) {
99                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
100             // }
101             return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
102         }, MoreExecutors.directExecutor());
103         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
104                 MoreExecutors.directExecutor());
105
106
107         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
108             // if (!input.isSuccessful()) {
109                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
110             // }
111             return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
112         }, MoreExecutors.directExecutor());
113         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
114                 MoreExecutors.directExecutor());
115         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
116             // if (!input.isSuccessful()) {
117                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
118             // }
119             return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
120         }, MoreExecutors.directExecutor());
121         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
122                 MoreExecutors.directExecutor());
123         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
124             // if (!input.isSuccessful()) {
125                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
126             // }
127             return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
128         }, MoreExecutors.directExecutor());
129         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
130                 MoreExecutors.directExecutor());
131         return resultVehicle;
132     }
133
134
135     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
136                                                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
137                                                       final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
138                                                       final SyncCrudCounters counters) {
139         if (flowsInTablesSyncBox.isEmpty()) {
140             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
141             return RpcResultBuilder.<Void>success().buildFuture();
142         }
143
144         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
145         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
146         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
147
148         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
149             final TableKey tableKey = flowsInTableBoxEntry.getKey();
150             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
151
152             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
153
154             for (final Flow flow : flowSyncBox.getItemsToPush()) {
155                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
156
157                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
158                         flow.getId(), tableKey, nodeId, flow.getMatch());
159
160                 allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
161                 flowCrudCounts.incAdded();
162             }
163
164             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
165                 final Flow existingFlow = flowUpdate.getOriginal();
166                 final Flow updatedFlow = flowUpdate.getUpdated();
167
168                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
169                         updatedFlow.key());
170                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
171                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
172
173                 allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
174                 flowCrudCounts.incUpdated();
175             }
176         }
177
178         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
179                 Futures.allAsList(allResults),
180                 ReconcileUtil.createRpcResultCondenser("flow adding"),
181                 MoreExecutors.directExecutor());
182
183         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
184                 Futures.allAsList(allUpdateResults),
185                 ReconcileUtil.createRpcResultCondenser("flow updating"),
186                 MoreExecutors.directExecutor());
187
188         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
189                 ReconcileUtil.createRpcResultCondenser("flow add/update"),
190                 MoreExecutors.directExecutor());
191     }
192
193     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
194                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
195                                                            final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
196                                                            final SyncCrudCounters counters) {
197         if (removalPlan.isEmpty()) {
198             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
199             return RpcResultBuilder.<Void>success().buildFuture();
200         }
201
202         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
203         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
204
205         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
206             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
207                     nodeIdent.child(Table.class, flowsPerTable.getKey());
208
209             // loop flows on device and check if the are configured
210             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
211                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
212                         tableIdent.child(Flow.class, flow.key());
213                 allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
214                 flowCrudCounts.incRemoved();
215             }
216         }
217
218         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
219                 Futures.allAsList(allResults),
220                 ReconcileUtil.createRpcResultCondenser("flow remove"),
221                 MoreExecutors.directExecutor());
222
223         return Futures.transformAsync(singleVoidResult,
224                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
225                 MoreExecutors.directExecutor());
226
227     }
228
229     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
230                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
231                                                             final ItemSyncBox<Meter> meterRemovalPlan,
232                                                             final SyncCrudCounters counters) {
233         if (meterRemovalPlan.isEmpty()) {
234             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
235             return RpcResultBuilder.<Void>success().buildFuture();
236         }
237
238         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
239
240         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
241         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
242             LOG.trace("removing meter {} - absent in config {}",
243                     meter.getMeterId(), nodeId);
244             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
245                     nodeIdent.child(Meter.class, meter.key());
246             allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
247             meterCrudCounts.incRemoved();
248         }
249
250         return Futures.transform(Futures.allAsList(allResults),
251                 ReconcileUtil.createRpcResultCondenser("meter remove"),
252                 MoreExecutors.directExecutor());
253     }
254
255     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
256                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
257                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
258                                                             final SyncCrudCounters counters) {
259         if (groupsRemovalPlan.isEmpty()) {
260             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
261             return RpcResultBuilder.<Void>success().buildFuture();
262         }
263
264         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
265
266         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
267         try {
268             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
269             if (LOG.isDebugEnabled()) {
270                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
271                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
272             }
273             Collections.reverse(groupsRemovalPlan);
274             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
275                 chainedResult = Futures.transformAsync(chainedResult, input -> {
276                     final ListenableFuture<RpcResult<Void>> result;
277                     if (input.isSuccessful()) {
278                         result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
279                     } else {
280                         // pass through original unsuccessful rpcResult
281                         result = Futures.immediateFuture(input);
282                     }
283
284                     return result;
285                 }, MoreExecutors.directExecutor());
286             }
287         } catch (IllegalStateException e) {
288             chainedResult = RpcResultBuilder.<Void>failed()
289                     .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
290                     .buildFuture();
291         }
292
293         return chainedResult;
294     }
295
296     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
297             final InstanceIdentifier<FlowCapableNode> nodeIdent,
298             final ItemSyncBox<Group> groupsPortion) {
299         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
300         for (Group group : groupsPortion.getItemsToPush()) {
301             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
302             allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
303         }
304
305         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
306                 Futures.allAsList(allResults),
307                 ReconcileUtil.createRpcResultCondenser("group remove"),
308                 MoreExecutors.directExecutor());
309
310         return Futures.transformAsync(singleVoidResult,
311                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
312                 MoreExecutors.directExecutor());
313     }
314
315     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
316                                                           final FlowCapableNode flowCapableNodeConfigured) {
317         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
318         //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
319
320         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
321 //        for (Table table : tableList) {
322 //            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
323 //            if (tableFeatures != null) {
324 //                for (TableFeatures tableFeaturesItem : tableFeatures) {
325 //                    // TODO uncomment java.lang.NullPointerException
326 //                    // at
327 //                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
328 //                    //    .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
329 //                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
330 //                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
331 //                }
332 //            }
333 //        }
334
335         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
336                 Futures.allAsList(allResults),
337                 ReconcileUtil.createRpcResultCondenser("table update"),
338                 MoreExecutors.directExecutor());
339
340         return Futures.transformAsync(singleVoidResult,
341                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
342                 MoreExecutors.directExecutor());
343     }
344
345     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
346             final InstanceIdentifier<FlowCapableNode> nodeIdent,
347             final ItemSyncBox<Group> groupsPortion) {
348         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
349         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
350
351         for (Group group : groupsPortion.getItemsToPush()) {
352             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
353             allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
354
355         }
356
357         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
358             final Group existingGroup = groupTuple.getOriginal();
359             final Group group = groupTuple.getUpdated();
360
361             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
362             allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
363         }
364
365         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
366                 Futures.allAsList(allResults),
367                 ReconcileUtil.createRpcResultCondenser("group add"),
368                 MoreExecutors.directExecutor());
369
370         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
371                 Futures.allAsList(allUpdateResults),
372                 ReconcileUtil.createRpcResultCondenser("group update"),
373                 MoreExecutors.directExecutor());
374
375         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
376                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
377                 ReconcileUtil.createRpcResultCondenser("group add/update"),
378                 MoreExecutors.directExecutor());
379
380
381         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
382                 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
383     }
384
385     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
386                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
387                                                        final ItemSyncBox<Meter> syncBox,
388                                                        final SyncCrudCounters counters) {
389         if (syncBox.isEmpty()) {
390             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
391             return RpcResultBuilder.<Void>success().buildFuture();
392         }
393
394         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
395
396         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
397         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
398         for (Meter meter : syncBox.getItemsToPush()) {
399             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
400             LOG.debug("adding meter {} - absent on device {}",
401                     meter.getMeterId(), nodeId);
402             allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
403             meterCrudCounts.incAdded();
404         }
405
406         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
407             final Meter existingMeter = meterTuple.getOriginal();
408             final Meter updated = meterTuple.getUpdated();
409             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
410             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
411             allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
412             meterCrudCounts.incUpdated();
413         }
414
415         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
416                 Futures.allAsList(allResults),
417                 ReconcileUtil.createRpcResultCondenser("meter add"),
418                 MoreExecutors.directExecutor());
419
420         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
421                 Futures.allAsList(allUpdateResults),
422                 ReconcileUtil.createRpcResultCondenser("meter update"),
423                 MoreExecutors.directExecutor());
424
425         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
426                 ReconcileUtil.createRpcResultCondenser("meter add/update"),
427                 MoreExecutors.directExecutor());
428     }
429
430     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
431                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
432                                                        final List<ItemSyncBox<Group>> groupsAddPlan,
433                                                        final SyncCrudCounters counters) {
434         if (groupsAddPlan.isEmpty()) {
435             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
436             return RpcResultBuilder.<Void>success().buildFuture();
437         }
438
439         ListenableFuture<RpcResult<Void>> chainedResult;
440         try {
441             if (!groupsAddPlan.isEmpty()) {
442                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
443                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
444                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
445
446                 if (LOG.isDebugEnabled()) {
447                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
448                             groupsAddPlan.size(),
449                             groupCrudCounts.getAdded(),
450                             groupCrudCounts.getUpdated());
451                 }
452
453                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
454                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
455                     chainedResult =
456                         Futures.transformAsync(chainedResult, input -> {
457                             final ListenableFuture<RpcResult<Void>> result;
458                             if (input.isSuccessful()) {
459                                 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
460                             } else {
461                                 // pass through original unsuccessful rpcResult
462                                 result = Futures.immediateFuture(input);
463                             }
464
465                             return result;
466                         }, MoreExecutors.directExecutor());
467                 }
468             } else {
469                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
470             }
471         } catch (IllegalStateException e) {
472             chainedResult = RpcResultBuilder.<Void>failed()
473                     .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
474                     .buildFuture();
475         }
476
477         return chainedResult;
478     }
479
480
481     public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
482         this.flowForwarder = flowForwarder;
483         return this;
484     }
485
486     public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
487         this.tableForwarder = tableForwarder;
488         return this;
489     }
490
491     public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
492         this.meterForwarder = meterForwarder;
493         return this;
494     }
495
496     public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
497         this.groupForwarder = groupForwarder;
498         return this;
499     }
500
501     public SyncPlanPushStrategyIncrementalImpl setTransactionService(
502             final FlowCapableTransactionService transactionService) {
503         this.transactionService = transactionService;
504         return this;
505     }
506
507 }