Bump upstreams
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
9
10 import com.google.common.collect.Iterables;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.Map;
18 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
19 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
20 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
21 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
22 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
24 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.common.ErrorType;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
56  */
57 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
58     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
59
60     private FlowForwarder flowForwarder;
61     private MeterForwarder meterForwarder;
62     private GroupForwarder groupForwarder;
63     private FlowCapableTransactionService transactionService;
64
65     @Override
66     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
67                                                                  final SynchronizationDiffInput diffInput,
68                                                                  final SyncCrudCounters counters) {
69         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
70         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
71
72         /* Tables - have to be pushed before groups */
73         // TODO enable table-update when ready
74         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
75
76         resultVehicle = Futures.transformAsync(resultVehicle,
77             input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
78             MoreExecutors.directExecutor());
79         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
80             MoreExecutors.directExecutor());
81         resultVehicle = Futures.transformAsync(resultVehicle,
82             input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
83             MoreExecutors.directExecutor());
84         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
85             MoreExecutors.directExecutor());
86         resultVehicle = Futures.transformAsync(resultVehicle,
87             input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
88             MoreExecutors.directExecutor());
89         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
90             MoreExecutors.directExecutor());
91
92         resultVehicle = Futures.transformAsync(resultVehicle,
93             input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
94             MoreExecutors.directExecutor());
95         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
96             MoreExecutors.directExecutor());
97         resultVehicle = Futures.transformAsync(resultVehicle,
98             input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
99             MoreExecutors.directExecutor());
100         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
101             MoreExecutors.directExecutor());
102         resultVehicle = Futures.transformAsync(resultVehicle,
103             input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
104             MoreExecutors.directExecutor());
105         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
106             MoreExecutors.directExecutor());
107         return resultVehicle;
108     }
109
110
111     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
112                                                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
113                                                       final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
114                                                       final SyncCrudCounters counters) {
115         if (flowsInTablesSyncBox.isEmpty()) {
116             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
117             return RpcResultBuilder.<Void>success().buildFuture();
118         }
119
120         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
121         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
122         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
123
124         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
125             final TableKey tableKey = flowsInTableBoxEntry.getKey();
126             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
127
128             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
129
130             for (final Flow flow : flowSyncBox.getItemsToPush()) {
131                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
132
133                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
134                         flow.getId(), tableKey, nodeId, flow.getMatch());
135
136                 allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
137                 flowCrudCounts.incAdded();
138             }
139
140             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
141                 final Flow existingFlow = flowUpdate.getOriginal();
142                 final Flow updatedFlow = flowUpdate.getUpdated();
143
144                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
145                         updatedFlow.key());
146                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
147                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
148
149                 allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
150                 flowCrudCounts.incUpdated();
151             }
152         }
153
154         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
155                 Futures.allAsList(allResults),
156                 ReconcileUtil.createRpcResultCondenser("flow adding"),
157                 MoreExecutors.directExecutor());
158
159         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
160                 Futures.allAsList(allUpdateResults),
161                 ReconcileUtil.createRpcResultCondenser("flow updating"),
162                 MoreExecutors.directExecutor());
163
164         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
165                 ReconcileUtil.createRpcResultCondenser("flow add/update"),
166                 MoreExecutors.directExecutor());
167     }
168
169     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
170                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
171                                                            final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
172                                                            final SyncCrudCounters counters) {
173         if (removalPlan.isEmpty()) {
174             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
175             return RpcResultBuilder.<Void>success().buildFuture();
176         }
177
178         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
179         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
180
181         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
182             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
183                     nodeIdent.child(Table.class, flowsPerTable.getKey());
184
185             // loop flows on device and check if the are configured
186             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
187                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
188                         tableIdent.child(Flow.class, flow.key());
189                 allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
190                 flowCrudCounts.incRemoved();
191             }
192         }
193
194         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
195                 Futures.allAsList(allResults),
196                 ReconcileUtil.createRpcResultCondenser("flow remove"),
197                 MoreExecutors.directExecutor());
198
199         return Futures.transformAsync(singleVoidResult,
200                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
201                 MoreExecutors.directExecutor());
202
203     }
204
205     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
206                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
207                                                             final ItemSyncBox<Meter> meterRemovalPlan,
208                                                             final SyncCrudCounters counters) {
209         if (meterRemovalPlan.isEmpty()) {
210             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
211             return RpcResultBuilder.<Void>success().buildFuture();
212         }
213
214         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
215
216         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
217         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
218             LOG.trace("removing meter {} - absent in config {}",
219                     meter.getMeterId(), nodeId);
220             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
221                     nodeIdent.child(Meter.class, meter.key());
222             allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
223             meterCrudCounts.incRemoved();
224         }
225
226         return Futures.transform(Futures.allAsList(allResults),
227                 ReconcileUtil.createRpcResultCondenser("meter remove"),
228                 MoreExecutors.directExecutor());
229     }
230
231     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
232                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
233                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
234                                                             final SyncCrudCounters counters) {
235         if (groupsRemovalPlan.isEmpty()) {
236             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
237             return RpcResultBuilder.<Void>success().buildFuture();
238         }
239
240         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
241
242         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
243         try {
244             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
245             if (LOG.isDebugEnabled()) {
246                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
247                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
248             }
249             Collections.reverse(groupsRemovalPlan);
250             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
251                 chainedResult = Futures.transformAsync(chainedResult, input -> {
252                     final ListenableFuture<RpcResult<Void>> result;
253                     if (input.isSuccessful()) {
254                         result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
255                     } else {
256                         // pass through original unsuccessful rpcResult
257                         result = Futures.immediateFuture(input);
258                     }
259
260                     return result;
261                 }, MoreExecutors.directExecutor());
262             }
263         } catch (IllegalStateException e) {
264             chainedResult = RpcResultBuilder.<Void>failed()
265                     .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
266                     .buildFuture();
267         }
268
269         return chainedResult;
270     }
271
272     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
273             final InstanceIdentifier<FlowCapableNode> nodeIdent,
274             final ItemSyncBox<Group> groupsPortion) {
275         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
276         for (Group group : groupsPortion.getItemsToPush()) {
277             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
278             allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
279         }
280
281         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
282                 Futures.allAsList(allResults),
283                 ReconcileUtil.createRpcResultCondenser("group remove"),
284                 MoreExecutors.directExecutor());
285
286         return Futures.transformAsync(singleVoidResult,
287                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
288                 MoreExecutors.directExecutor());
289     }
290
291     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
292                                                           final FlowCapableNode flowCapableNodeConfigured) {
293         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
294         //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
295
296         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
297 //        for (Table table : tableList) {
298 //            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
299 //            if (tableFeatures != null) {
300 //                for (TableFeatures tableFeaturesItem : tableFeatures) {
301 //                    // TODO uncomment java.lang.NullPointerException
302 //                    // at
303 //                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
304 //                    //    .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
305 //                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
306 //                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
307 //                }
308 //            }
309 //        }
310
311         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
312                 Futures.allAsList(allResults),
313                 ReconcileUtil.createRpcResultCondenser("table update"),
314                 MoreExecutors.directExecutor());
315
316         return Futures.transformAsync(singleVoidResult,
317                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
318                 MoreExecutors.directExecutor());
319     }
320
321     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
322             final InstanceIdentifier<FlowCapableNode> nodeIdent,
323             final ItemSyncBox<Group> groupsPortion) {
324         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
325         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
326
327         for (Group group : groupsPortion.getItemsToPush()) {
328             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
329             allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
330
331         }
332
333         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
334             final Group existingGroup = groupTuple.getOriginal();
335             final Group group = groupTuple.getUpdated();
336
337             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
338             allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
339         }
340
341         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
342                 Futures.allAsList(allResults),
343                 ReconcileUtil.createRpcResultCondenser("group add"),
344                 MoreExecutors.directExecutor());
345
346         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
347                 Futures.allAsList(allUpdateResults),
348                 ReconcileUtil.createRpcResultCondenser("group update"),
349                 MoreExecutors.directExecutor());
350
351         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
352                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
353                 ReconcileUtil.createRpcResultCondenser("group add/update"),
354                 MoreExecutors.directExecutor());
355
356
357         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
358                 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
359     }
360
361     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
362                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
363                                                        final ItemSyncBox<Meter> syncBox,
364                                                        final SyncCrudCounters counters) {
365         if (syncBox.isEmpty()) {
366             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
367             return RpcResultBuilder.<Void>success().buildFuture();
368         }
369
370         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
371
372         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
373         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
374         for (Meter meter : syncBox.getItemsToPush()) {
375             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
376             LOG.debug("adding meter {} - absent on device {}",
377                     meter.getMeterId(), nodeId);
378             allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
379             meterCrudCounts.incAdded();
380         }
381
382         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
383             final Meter existingMeter = meterTuple.getOriginal();
384             final Meter updated = meterTuple.getUpdated();
385             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
386             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
387             allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
388             meterCrudCounts.incUpdated();
389         }
390
391         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
392                 Futures.allAsList(allResults),
393                 ReconcileUtil.createRpcResultCondenser("meter add"),
394                 MoreExecutors.directExecutor());
395
396         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
397                 Futures.allAsList(allUpdateResults),
398                 ReconcileUtil.createRpcResultCondenser("meter update"),
399                 MoreExecutors.directExecutor());
400
401         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
402                 ReconcileUtil.createRpcResultCondenser("meter add/update"),
403                 MoreExecutors.directExecutor());
404     }
405
406     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
407                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
408                                                        final List<ItemSyncBox<Group>> groupsAddPlan,
409                                                        final SyncCrudCounters counters) {
410         if (groupsAddPlan.isEmpty()) {
411             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
412             return RpcResultBuilder.<Void>success().buildFuture();
413         }
414
415         ListenableFuture<RpcResult<Void>> chainedResult;
416         try {
417             if (!groupsAddPlan.isEmpty()) {
418                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
419                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
420                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
421
422                 if (LOG.isDebugEnabled()) {
423                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
424                             groupsAddPlan.size(),
425                             groupCrudCounts.getAdded(),
426                             groupCrudCounts.getUpdated());
427                 }
428
429                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
430                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
431                     chainedResult =
432                         Futures.transformAsync(chainedResult, input -> {
433                             final ListenableFuture<RpcResult<Void>> result;
434                             if (input.isSuccessful()) {
435                                 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
436                             } else {
437                                 // pass through original unsuccessful rpcResult
438                                 result = Futures.immediateFuture(input);
439                             }
440
441                             return result;
442                         }, MoreExecutors.directExecutor());
443                 }
444             } else {
445                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
446             }
447         } catch (IllegalStateException e) {
448             chainedResult = RpcResultBuilder.<Void>failed()
449                     .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
450                     .buildFuture();
451         }
452
453         return chainedResult;
454     }
455
456
457     public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
458         this.flowForwarder = flowForwarder;
459         return this;
460     }
461
462     @Deprecated(since = "0.17.2", forRemoval = true)
463     public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
464         return this;
465     }
466
467     public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
468         this.meterForwarder = meterForwarder;
469         return this;
470     }
471
472     public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
473         this.groupForwarder = groupForwarder;
474         return this;
475     }
476
477     public SyncPlanPushStrategyIncrementalImpl setTransactionService(
478             final FlowCapableTransactionService transactionService) {
479         this.transactionService = transactionService;
480         return this;
481     }
482 }