Merge "OPNFLWPLUG-929 : Remove deprecated guava library"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Map;
21 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
22 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
23 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
24 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
25 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.common.RpcError;
54 import org.opendaylight.yangtools.yang.common.RpcResult;
55 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
61  */
62 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
63
64     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
65
66     private FlowForwarder flowForwarder;
67     private MeterForwarder meterForwarder;
68     private GroupForwarder groupForwarder;
69     private TableForwarder tableForwarder;
70     private FlowCapableTransactionService transactionService;
71
72     @Override
73     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
74                                                                  final SynchronizationDiffInput diffInput,
75                                                                  final SyncCrudCounters counters) {
76         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
77         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
78
79         /* Tables - have to be pushed before groups */
80         // TODO enable table-update when ready
81         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
82
83         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
84             @Override
85             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
86                 if (!input.isSuccessful()) {
87                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
88                     //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
89                     //        Futures.asList Arrays.asList(input, output),
90                     //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
91                 }
92                 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
93             }
94         }, MoreExecutors.directExecutor());
95         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
96                 MoreExecutors.directExecutor());
97         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
98             @Override
99             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
100                 if (!input.isSuccessful()) {
101                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
102                 }
103                 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
104             }
105         }, MoreExecutors.directExecutor());
106         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
107                 MoreExecutors.directExecutor());
108         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
109             @Override
110             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
111                 if (!input.isSuccessful()) {
112                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
113                 }
114                 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
115             }
116         }, MoreExecutors.directExecutor());
117         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
118                 MoreExecutors.directExecutor());
119
120
121         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
122             @Override
123             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
124                 if (!input.isSuccessful()) {
125                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
126                 }
127                 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
128             }
129         }, MoreExecutors.directExecutor());
130         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
131                 MoreExecutors.directExecutor());
132         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
133             @Override
134             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
135                 if (!input.isSuccessful()) {
136                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
137                 }
138                 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
139             }
140         }, MoreExecutors.directExecutor());
141         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
142                 MoreExecutors.directExecutor());
143         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
144             @Override
145             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
146                 if (!input.isSuccessful()) {
147                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
148                 }
149                 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
150             }
151         }, MoreExecutors.directExecutor());
152         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
153                 MoreExecutors.directExecutor());
154         return resultVehicle;
155     }
156
157
158     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
159                                                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
160                                                       final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
161                                                       final SyncCrudCounters counters) {
162         if (flowsInTablesSyncBox.isEmpty()) {
163             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
164             return RpcResultBuilder.<Void>success().buildFuture();
165         }
166
167         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
168         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
169         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
170
171         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
172             final TableKey tableKey = flowsInTableBoxEntry.getKey();
173             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
174
175             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
176
177             for (final Flow flow : flowSyncBox.getItemsToPush()) {
178                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
179
180                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
181                         flow.getId(), tableKey, nodeId, flow.getMatch());
182
183                 allResults.add(JdkFutureAdapters.listenInPoolThread(
184                         flowForwarder.add(flowIdent, flow, nodeIdent)));
185                 flowCrudCounts.incAdded();
186             }
187
188             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
189                 final Flow existingFlow = flowUpdate.getOriginal();
190                 final Flow updatedFlow = flowUpdate.getUpdated();
191
192                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
193                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
194                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
195
196                 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
197                         flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
198                 flowCrudCounts.incUpdated();
199             }
200         }
201
202         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
203                 Futures.allAsList(allResults),
204                 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"),
205                 MoreExecutors.directExecutor());
206
207         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
208                 Futures.allAsList(allUpdateResults),
209                 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"),
210                 MoreExecutors.directExecutor());
211
212         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
213                 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"),
214                 MoreExecutors.directExecutor());
215     }
216
217     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
218                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
219                                                            final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
220                                                            final SyncCrudCounters counters) {
221         if (removalPlan.isEmpty()) {
222             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
223             return RpcResultBuilder.<Void>success().buildFuture();
224         }
225
226         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
227         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
228
229         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
230             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
231                     nodeIdent.child(Table.class, flowsPerTable.getKey());
232
233             // loop flows on device and check if the are configured
234             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
235                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
236                         tableIdent.child(Flow.class, flow.getKey());
237                 allResults.add(JdkFutureAdapters.listenInPoolThread(
238                         flowForwarder.remove(flowIdent, flow, nodeIdent)));
239                 flowCrudCounts.incRemoved();
240             }
241         }
242
243         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
244                 Futures.allAsList(allResults),
245                 ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"),
246                 MoreExecutors.directExecutor());
247
248         return Futures.transformAsync(singleVoidResult,
249                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
250                 MoreExecutors.directExecutor());
251
252     }
253
254     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
255                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
256                                                             final ItemSyncBox<Meter> meterRemovalPlan,
257                                                             final SyncCrudCounters counters) {
258         if (meterRemovalPlan.isEmpty()) {
259             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
260             return RpcResultBuilder.<Void>success().buildFuture();
261         }
262
263         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
264
265         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
266         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
267             LOG.trace("removing meter {} - absent in config {}",
268                     meter.getMeterId(), nodeId);
269             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
270                     nodeIdent.child(Meter.class, meter.getKey());
271             allResults.add(JdkFutureAdapters.listenInPoolThread(
272                     meterForwarder.remove(meterIdent, meter, nodeIdent)));
273             meterCrudCounts.incRemoved();
274         }
275
276         return Futures.transform(Futures.allAsList(allResults),
277                 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"),
278                 MoreExecutors.directExecutor());
279     }
280
281     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
282                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
283                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
284                                                             final SyncCrudCounters counters) {
285         if (groupsRemovalPlan.isEmpty()) {
286             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
287             return RpcResultBuilder.<Void>success().buildFuture();
288         }
289
290         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
291
292         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
293         try {
294             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
295             if (LOG.isDebugEnabled()) {
296                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
297                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
298             }
299             Collections.reverse(groupsRemovalPlan);
300             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
301                 chainedResult =
302                         Futures.transformAsync(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
303                             @Override
304                             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
305                                     throws Exception {
306                                 final ListenableFuture<RpcResult<Void>> result;
307                                 if (input.isSuccessful()) {
308                                     result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
309                                 } else {
310                                     // pass through original unsuccessful rpcResult
311                                     result = Futures.immediateFuture(input);
312                                 }
313
314                                 return result;
315                             }
316                         }, MoreExecutors.directExecutor());
317             }
318         } catch (IllegalStateException e) {
319             chainedResult = RpcResultBuilder.<Void>failed()
320                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
321                     .buildFuture();
322         }
323
324         return chainedResult;
325     }
326
327     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
328             final InstanceIdentifier<FlowCapableNode> nodeIdent,
329             final ItemSyncBox<Group> groupsPortion) {
330         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
331         for (Group group : groupsPortion.getItemsToPush()) {
332             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
333             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
334         }
335
336         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
337                 Futures.allAsList(allResults),
338                 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"),
339                 MoreExecutors.directExecutor());
340
341         return Futures.transformAsync(singleVoidResult,
342                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
343                 MoreExecutors.directExecutor());
344     }
345
346     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
347                                                           final FlowCapableNode flowCapableNodeConfigured) {
348         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
349         final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
350
351         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
352         for (Table table : tableList) {
353             TableKey tableKey = table.getKey();
354             KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
355                     .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
356             List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
357             if (tableFeatures != null) {
358                 for (TableFeatures tableFeaturesItem : tableFeatures) {
359                     // TODO uncomment java.lang.NullPointerException
360                     // at
361                     // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
362                     // allResults.add(JdkFutureAdapters.listenInPoolThread(
363                     // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
364                 }
365             }
366         }
367
368         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
369                 Futures.allAsList(allResults),
370                 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"),
371                 MoreExecutors.directExecutor());
372
373         return Futures.transformAsync(singleVoidResult,
374                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
375                 MoreExecutors.directExecutor());
376     }
377
378     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
379             final InstanceIdentifier<FlowCapableNode> nodeIdent,
380             final ItemSyncBox<Group> groupsPortion) {
381         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
382         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
383
384         for (Group group : groupsPortion.getItemsToPush()) {
385             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
386             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
387
388         }
389
390         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
391             final Group existingGroup = groupTuple.getOriginal();
392             final Group group = groupTuple.getUpdated();
393
394             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
395             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
396                     groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
397         }
398
399         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
400                 Futures.allAsList(allResults),
401                 ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"),
402                 MoreExecutors.directExecutor());
403
404         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
405                 Futures.allAsList(allUpdateResults),
406                 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"),
407                 MoreExecutors.directExecutor());
408
409         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
410                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
411                 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"),
412                 MoreExecutors.directExecutor());
413
414
415         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
416                 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
417     }
418
419     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
420                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
421                                                        final ItemSyncBox<Meter> syncBox,
422                                                        final SyncCrudCounters counters) {
423         if (syncBox.isEmpty()) {
424             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
425             return RpcResultBuilder.<Void>success().buildFuture();
426         }
427
428         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
429
430         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
431         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
432         for (Meter meter : syncBox.getItemsToPush()) {
433             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
434             LOG.debug("adding meter {} - absent on device {}",
435                     meter.getMeterId(), nodeId);
436             allResults.add(JdkFutureAdapters.listenInPoolThread(
437                     meterForwarder.add(meterIdent, meter, nodeIdent)));
438             meterCrudCounts.incAdded();
439         }
440
441         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
442             final Meter existingMeter = meterTuple.getOriginal();
443             final Meter updated = meterTuple.getUpdated();
444             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
445             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
446             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
447                     meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
448             meterCrudCounts.incUpdated();
449         }
450
451         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
452                 Futures.allAsList(allResults),
453                 ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"),
454                 MoreExecutors.directExecutor());
455
456         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
457                 Futures.allAsList(allUpdateResults),
458                 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"),
459                 MoreExecutors.directExecutor());
460
461         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
462                 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"),
463                 MoreExecutors.directExecutor());
464     }
465
466     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
467                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
468                                                        final List<ItemSyncBox<Group>> groupsAddPlan,
469                                                        final SyncCrudCounters counters) {
470         if (groupsAddPlan.isEmpty()) {
471             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
472             return RpcResultBuilder.<Void>success().buildFuture();
473         }
474
475         ListenableFuture<RpcResult<Void>> chainedResult;
476         try {
477             if (!groupsAddPlan.isEmpty()) {
478                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
479                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
480                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
481
482                 if (LOG.isDebugEnabled()) {
483                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
484                             groupsAddPlan.size(),
485                             groupCrudCounts.getAdded(),
486                             groupCrudCounts.getUpdated());
487                 }
488
489                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
490                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
491                     chainedResult =
492                             Futures.transformAsync(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
493                                 @Override
494                                 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
495                                         throws Exception {
496                                     final ListenableFuture<RpcResult<Void>> result;
497                                     if (input.isSuccessful()) {
498                                         result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
499                                     } else {
500                                         // pass through original unsuccessful rpcResult
501                                         result = Futures.immediateFuture(input);
502                                     }
503
504                                     return result;
505                                 }
506                             }, MoreExecutors.directExecutor());
507                 }
508             } else {
509                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
510             }
511         } catch (IllegalStateException e) {
512             chainedResult = RpcResultBuilder.<Void>failed()
513                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
514                     .buildFuture();
515         }
516
517         return chainedResult;
518     }
519
520
521     public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
522         this.flowForwarder = flowForwarder;
523         return this;
524     }
525
526     public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
527         this.tableForwarder = tableForwarder;
528         return this;
529     }
530
531     public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
532         this.meterForwarder = meterForwarder;
533         return this;
534     }
535
536     public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
537         this.groupForwarder = groupForwarder;
538         return this;
539     }
540
541     public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
542         this.transactionService = transactionService;
543         return this;
544     }
545
546 }