Do not use JdkFutureAdapters
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.Iterables;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Map;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
22 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
24 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrier;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.ErrorType;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
58  */
59 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
60     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
61
62     private final FlowForwarder flowForwarder;
63     private final MeterForwarder meterForwarder;
64     private final GroupForwarder groupForwarder;
65     private final SendBarrier sendBarrier;
66
67     public SyncPlanPushStrategyIncrementalImpl(final FlowForwarder flowForwarder, final MeterForwarder meterForwarder,
68             final GroupForwarder groupForwarder, final SendBarrier sendBarrier) {
69         this.flowForwarder = requireNonNull(flowForwarder);
70         this.meterForwarder = requireNonNull(meterForwarder);
71         this.groupForwarder = requireNonNull(groupForwarder);
72         this.sendBarrier = requireNonNull(sendBarrier);
73     }
74
75     @Override
76     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
77             final SynchronizationDiffInput diffInput, final SyncCrudCounters counters) {
78         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
79         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
80
81         /* Tables - have to be pushed before groups */
82         // TODO enable table-update when ready
83         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
84
85         resultVehicle = Futures.transformAsync(resultVehicle,
86             input -> addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters),
87             MoreExecutors.directExecutor());
88         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
89             MoreExecutors.directExecutor());
90         resultVehicle = Futures.transformAsync(resultVehicle,
91             input -> addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters),
92             MoreExecutors.directExecutor());
93         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
94             MoreExecutors.directExecutor());
95         resultVehicle = Futures.transformAsync(resultVehicle,
96             input -> addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters),
97             MoreExecutors.directExecutor());
98         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
99             MoreExecutors.directExecutor());
100
101         resultVehicle = Futures.transformAsync(resultVehicle,
102             input -> removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters),
103             MoreExecutors.directExecutor());
104         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
105             MoreExecutors.directExecutor());
106         resultVehicle = Futures.transformAsync(resultVehicle,
107             input -> removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters),
108             MoreExecutors.directExecutor());
109         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
110             MoreExecutors.directExecutor());
111         resultVehicle = Futures.transformAsync(resultVehicle,
112             input -> removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters),
113             MoreExecutors.directExecutor());
114         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
115             MoreExecutors.directExecutor());
116         return resultVehicle;
117     }
118
119
120     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
121             final InstanceIdentifier<FlowCapableNode> nodeIdent,
122             final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox, final SyncCrudCounters counters) {
123         if (flowsInTablesSyncBox.isEmpty()) {
124             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
125             return RpcResultBuilder.<Void>success().buildFuture();
126         }
127
128         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
129         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
130         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
131
132         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
133             final TableKey tableKey = flowsInTableBoxEntry.getKey();
134             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
135
136             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
137
138             for (final Flow flow : flowSyncBox.getItemsToPush()) {
139                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.key());
140
141                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
142                         flow.getId(), tableKey, nodeId, flow.getMatch());
143
144                 allResults.add(flowForwarder.add(flowIdent, flow, nodeIdent));
145                 flowCrudCounts.incAdded();
146             }
147
148             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
149                 final Flow existingFlow = flowUpdate.getOriginal();
150                 final Flow updatedFlow = flowUpdate.getUpdated();
151
152                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
153                         updatedFlow.key());
154                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
155                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
156
157                 allUpdateResults.add(flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent));
158                 flowCrudCounts.incUpdated();
159             }
160         }
161
162         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
163                 Futures.allAsList(allResults),
164                 ReconcileUtil.createRpcResultCondenser("flow adding"),
165                 MoreExecutors.directExecutor());
166
167         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
168                 Futures.allAsList(allUpdateResults),
169                 ReconcileUtil.createRpcResultCondenser("flow updating"),
170                 MoreExecutors.directExecutor());
171
172         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
173                 ReconcileUtil.createRpcResultCondenser("flow add/update"),
174                 MoreExecutors.directExecutor());
175     }
176
177     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
178                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
179                                                            final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
180                                                            final SyncCrudCounters counters) {
181         if (removalPlan.isEmpty()) {
182             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
183             return RpcResultBuilder.<Void>success().buildFuture();
184         }
185
186         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
187         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
188
189         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
190             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
191                     nodeIdent.child(Table.class, flowsPerTable.getKey());
192
193             // loop flows on device and check if the are configured
194             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
195                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
196                         tableIdent.child(Flow.class, flow.key());
197                 allResults.add(flowForwarder.remove(flowIdent, flow, nodeIdent));
198                 flowCrudCounts.incRemoved();
199             }
200         }
201
202         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
203                 Futures.allAsList(allResults),
204                 ReconcileUtil.createRpcResultCondenser("flow remove"),
205                 MoreExecutors.directExecutor());
206
207         return Futures.transformAsync(singleVoidResult,
208                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
209                 MoreExecutors.directExecutor());
210
211     }
212
213     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
214                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
215                                                             final ItemSyncBox<Meter> meterRemovalPlan,
216                                                             final SyncCrudCounters counters) {
217         if (meterRemovalPlan.isEmpty()) {
218             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
219             return RpcResultBuilder.<Void>success().buildFuture();
220         }
221
222         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
223
224         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
225         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
226             LOG.trace("removing meter {} - absent in config {}",
227                     meter.getMeterId(), nodeId);
228             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
229                     nodeIdent.child(Meter.class, meter.key());
230             allResults.add(meterForwarder.remove(meterIdent, meter, nodeIdent));
231             meterCrudCounts.incRemoved();
232         }
233
234         return Futures.transform(Futures.allAsList(allResults),
235                 ReconcileUtil.createRpcResultCondenser("meter remove"),
236                 MoreExecutors.directExecutor());
237     }
238
239     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
240                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
241                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
242                                                             final SyncCrudCounters counters) {
243         if (groupsRemovalPlan.isEmpty()) {
244             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
245             return RpcResultBuilder.<Void>success().buildFuture();
246         }
247
248         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
249
250         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
251         try {
252             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
253             if (LOG.isDebugEnabled()) {
254                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
255                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
256             }
257             Collections.reverse(groupsRemovalPlan);
258             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
259                 chainedResult = Futures.transformAsync(chainedResult, input -> {
260                     final ListenableFuture<RpcResult<Void>> result;
261                     if (input.isSuccessful()) {
262                         result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
263                     } else {
264                         // pass through original unsuccessful rpcResult
265                         result = Futures.immediateFuture(input);
266                     }
267
268                     return result;
269                 }, MoreExecutors.directExecutor());
270             }
271         } catch (IllegalStateException e) {
272             chainedResult = RpcResultBuilder.<Void>failed()
273                     .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
274                     .buildFuture();
275         }
276
277         return chainedResult;
278     }
279
280     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
281             final InstanceIdentifier<FlowCapableNode> nodeIdent,
282             final ItemSyncBox<Group> groupsPortion) {
283         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
284         for (Group group : groupsPortion.getItemsToPush()) {
285             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
286             allResults.add(groupForwarder.remove(groupIdent, group, nodeIdent));
287         }
288
289         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
290                 Futures.allAsList(allResults),
291                 ReconcileUtil.createRpcResultCondenser("group remove"),
292                 MoreExecutors.directExecutor());
293
294         return Futures.transformAsync(singleVoidResult,
295                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
296                 MoreExecutors.directExecutor());
297     }
298
299     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
300                                                           final FlowCapableNode flowCapableNodeConfigured) {
301         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
302         //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
303
304         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
305 //        for (Table table : tableList) {
306 //            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
307 //            if (tableFeatures != null) {
308 //                for (TableFeatures tableFeaturesItem : tableFeatures) {
309 //                    // TODO uncomment java.lang.NullPointerException
310 //                    // at
311 //                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
312 //                    //    .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
313 //                    // allResults.add(
314 //                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent));
315 //                }
316 //            }
317 //        }
318
319         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
320                 Futures.allAsList(allResults),
321                 ReconcileUtil.createRpcResultCondenser("table update"),
322                 MoreExecutors.directExecutor());
323
324         return Futures.transformAsync(singleVoidResult,
325                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), sendBarrier),
326                 MoreExecutors.directExecutor());
327     }
328
329     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
330             final InstanceIdentifier<FlowCapableNode> nodeIdent,
331             final ItemSyncBox<Group> groupsPortion) {
332         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
333         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
334
335         for (Group group : groupsPortion.getItemsToPush()) {
336             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
337             allResults.add(groupForwarder.add(groupIdent, group, nodeIdent));
338
339         }
340
341         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
342             final Group existingGroup = groupTuple.getOriginal();
343             final Group group = groupTuple.getUpdated();
344
345             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.key());
346             allUpdateResults.add(groupForwarder.update(groupIdent, existingGroup, group, nodeIdent));
347         }
348
349         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
350                 Futures.allAsList(allResults),
351                 ReconcileUtil.createRpcResultCondenser("group add"),
352                 MoreExecutors.directExecutor());
353
354         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
355                 Futures.allAsList(allUpdateResults),
356                 ReconcileUtil.createRpcResultCondenser("group update"),
357                 MoreExecutors.directExecutor());
358
359         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
360                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
361                 ReconcileUtil.createRpcResultCondenser("group add/update"),
362                 MoreExecutors.directExecutor());
363
364
365         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
366                 PathUtil.digNodePath(nodeIdent), sendBarrier), MoreExecutors.directExecutor());
367     }
368
369     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
370                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
371                                                        final ItemSyncBox<Meter> syncBox,
372                                                        final SyncCrudCounters counters) {
373         if (syncBox.isEmpty()) {
374             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
375             return RpcResultBuilder.<Void>success().buildFuture();
376         }
377
378         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
379
380         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
381         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
382         for (Meter meter : syncBox.getItemsToPush()) {
383             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.key());
384             LOG.debug("adding meter {} - absent on device {}",
385                     meter.getMeterId(), nodeId);
386             allResults.add(meterForwarder.add(meterIdent, meter, nodeIdent));
387             meterCrudCounts.incAdded();
388         }
389
390         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
391             final Meter existingMeter = meterTuple.getOriginal();
392             final Meter updated = meterTuple.getUpdated();
393             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.key());
394             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
395             allUpdateResults.add(meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent));
396             meterCrudCounts.incUpdated();
397         }
398
399         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
400                 Futures.allAsList(allResults),
401                 ReconcileUtil.createRpcResultCondenser("meter add"),
402                 MoreExecutors.directExecutor());
403
404         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
405                 Futures.allAsList(allUpdateResults),
406                 ReconcileUtil.createRpcResultCondenser("meter update"),
407                 MoreExecutors.directExecutor());
408
409         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
410                 ReconcileUtil.createRpcResultCondenser("meter add/update"),
411                 MoreExecutors.directExecutor());
412     }
413
414     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
415                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
416                                                        final List<ItemSyncBox<Group>> groupsAddPlan,
417                                                        final SyncCrudCounters counters) {
418         if (groupsAddPlan.isEmpty()) {
419             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
420             return RpcResultBuilder.<Void>success().buildFuture();
421         }
422
423         ListenableFuture<RpcResult<Void>> chainedResult;
424         try {
425             if (!groupsAddPlan.isEmpty()) {
426                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
427                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
428                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
429
430                 if (LOG.isDebugEnabled()) {
431                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
432                             groupsAddPlan.size(),
433                             groupCrudCounts.getAdded(),
434                             groupCrudCounts.getUpdated());
435                 }
436
437                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
438                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
439                     chainedResult =
440                         Futures.transformAsync(chainedResult, input -> {
441                             final ListenableFuture<RpcResult<Void>> result;
442                             if (input.isSuccessful()) {
443                                 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
444                             } else {
445                                 // pass through original unsuccessful rpcResult
446                                 result = Futures.immediateFuture(input);
447                             }
448
449                             return result;
450                         }, MoreExecutors.directExecutor());
451                 }
452             } else {
453                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
454             }
455         } catch (IllegalStateException e) {
456             chainedResult = RpcResultBuilder.<Void>failed()
457                     .withError(ErrorType.APPLICATION, "failed to add missing groups", e)
458                     .buildFuture();
459         }
460
461         return chainedResult;
462     }
463 }