Merge "Removed duplicate sal-binding-config dependency."
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Map;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.impl.FlowForwarder;
22 import org.opendaylight.openflowplugin.applications.frsync.impl.GroupForwarder;
23 import org.opendaylight.openflowplugin.applications.frsync.impl.MeterForwarder;
24 import org.opendaylight.openflowplugin.applications.frsync.impl.TableForwarder;
25 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
26 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
28 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
29 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
30 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcError;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
64  */
65 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
66
67     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
68
69     private FlowForwarder flowForwarder;
70     private TableForwarder tableForwarder;
71     private MeterForwarder meterForwarder;
72     private GroupForwarder groupForwarder;
73     private FlowCapableTransactionService transactionService;
74
75     @Override
76     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
77                                                                  final SynchronizationDiffInput diffInput,
78                                                                  final SyncCrudCounters counters) {
79         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
80         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
81
82         /* Tables - have to be pushed before groups */
83         // TODO enable table-update when ready
84         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
85
86         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
87             @Override
88             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
89                 if (!input.isSuccessful()) {
90                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
91                     //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
92                     //        Futures.asList Arrays.asList(input, output),
93                     //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
94                 }
95                 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
96             }
97         });
98         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"));
99         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
100             @Override
101             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
102                 if (!input.isSuccessful()) {
103                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
104                 }
105                 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
106             }
107         });
108         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"));
109         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
110             @Override
111             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
112                 if (!input.isSuccessful()) {
113                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
114                 }
115                 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
116             }
117         });
118         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"));
119
120
121         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
122             @Override
123             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
124                 if (!input.isSuccessful()) {
125                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
126                 }
127                 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
128             }
129         });
130         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"));
131         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
132             @Override
133             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
134                 if (!input.isSuccessful()) {
135                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
136                 }
137                 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
138             }
139         });
140         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"));
141         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
142             @Override
143             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
144                 if (!input.isSuccessful()) {
145                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
146                 }
147                 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
148             }
149         });
150         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
151         return resultVehicle;
152     }
153
154
155     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
156                                                                 final InstanceIdentifier<FlowCapableNode> nodeIdent,
157                                                                 final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
158                                                                 final SyncCrudCounters counters) {
159         if (flowsInTablesSyncBox.isEmpty()) {
160             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
161             return RpcResultBuilder.<Void>success().buildFuture();
162         }
163
164         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
165         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
166         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
167
168         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
169             final TableKey tableKey = flowsInTableBoxEntry.getKey();
170             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
171
172             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
173
174             for (final Flow flow : flowSyncBox.getItemsToPush()) {
175                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
176
177                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
178                         flow.getId(), tableKey, nodeId, flow.getMatch());
179
180                 allResults.add(JdkFutureAdapters.listenInPoolThread(
181                         flowForwarder.add(flowIdent, flow, nodeIdent)));
182                 flowCrudCounts.incAdded();
183             }
184
185             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
186                 final Flow existingFlow = flowUpdate.getOriginal();
187                 final Flow updatedFlow = flowUpdate.getUpdated();
188
189                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
190                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
191                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
192
193                 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
194                         flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
195                 flowCrudCounts.incUpdated();
196             }
197         }
198
199         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
200                 Futures.allAsList(allResults),
201                 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
202
203         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
204                 Futures.allAsList(allUpdateResults),
205                 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
206
207         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
208                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
209                 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
210
211         return summaryResult;
212
213         /*
214         return Futures.transform(summaryResult,
215                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
216                 */
217     }
218
219     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
220                                                                      final InstanceIdentifier<FlowCapableNode> nodeIdent,
221                                                                      final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
222                                                                      final SyncCrudCounters counters) {
223         if (removalPlan.isEmpty()) {
224             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
225             return RpcResultBuilder.<Void>success().buildFuture();
226         }
227
228         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
229         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
230
231         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
232             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
233                     nodeIdent.child(Table.class, flowsPerTable.getKey());
234
235             // loop flows on device and check if the are configured
236             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
237                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
238                         tableIdent.child(Flow.class, flow.getKey());
239                 allResults.add(JdkFutureAdapters.listenInPoolThread(
240                         flowForwarder.remove(flowIdent, flow, nodeIdent)));
241                 flowCrudCounts.incRemoved();
242             }
243         }
244
245         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
246                 Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
247         return Futures.transform(singleVoidResult,
248                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
249
250     }
251
252     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
253                                                                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
254                                                                       final ItemSyncBox<Meter> meterRemovalPlan,
255                                                                       final SyncCrudCounters counters) {
256         if (meterRemovalPlan.isEmpty()) {
257             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
258             return RpcResultBuilder.<Void>success().buildFuture();
259         }
260
261         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
262
263         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
264         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
265             LOG.trace("removing meter {} - absent in config {}",
266                     meter.getMeterId(), nodeId);
267             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
268                     nodeIdent.child(Meter.class, meter.getKey());
269             allResults.add(JdkFutureAdapters.listenInPoolThread(
270                     meterForwarder.remove(meterIdent, meter, nodeIdent)));
271             meterCrudCounts.incRemoved();
272         }
273
274         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
275                 Futures.allAsList(allResults),
276                 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
277         return singleVoidResult;
278         /*
279         return Futures.transform(singleVoidResult,
280                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
281                 */
282     }
283
284     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
285                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
286                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
287                                                             final SyncCrudCounters counters) {
288         if (groupsRemovalPlan.isEmpty()) {
289             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
290             return RpcResultBuilder.<Void>success().buildFuture();
291         }
292
293         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
294
295         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
296         try {
297             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
298             if (LOG.isDebugEnabled()) {
299                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
300                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
301             }
302             Collections.reverse(groupsRemovalPlan);
303             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
304                 chainedResult =
305                         Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
306                             @Override
307                             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
308                                     throws Exception {
309                                 final ListenableFuture<RpcResult<Void>> result;
310                                 if (input.isSuccessful()) {
311                                     result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
312                                 } else {
313                                     // pass through original unsuccessful rpcResult
314                                     result = Futures.immediateFuture(input);
315                                 }
316
317                                 return result;
318                             }
319                         });
320             }
321         } catch (IllegalStateException e) {
322             chainedResult = RpcResultBuilder.<Void>failed()
323                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
324                     .buildFuture();
325         }
326
327         return chainedResult;
328     }
329
330     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
331             final InstanceIdentifier<FlowCapableNode> nodeIdent,
332             final ItemSyncBox<Group> groupsPortion) {
333         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
334         for (Group group : groupsPortion.getItemsToPush()) {
335             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
336             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
337         }
338
339         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
340                 Futures.allAsList(allResults),
341                 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
342
343         return Futures.transform(singleVoidResult,
344                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
345     }
346
347     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
348                                                           final FlowCapableNode flowCapableNodeConfigured) {
349         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
350         final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
351
352         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
353         for (Table table : tableList) {
354             TableKey tableKey = table.getKey();
355             KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
356                     .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
357             List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
358             if (tableFeatures != null) {
359                 for (TableFeatures tableFeaturesItem : tableFeatures) {
360                     // TODO uncomment java.lang.NullPointerException
361                     // at
362                     // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
363                     // allResults.add(JdkFutureAdapters.listenInPoolThread(
364                     // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
365                 }
366             }
367         }
368
369         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
370                 Futures.allAsList(allResults),
371                 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
372
373         return Futures.transform(singleVoidResult,
374                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
375     }
376
377     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
378             final InstanceIdentifier<FlowCapableNode> nodeIdent,
379             final ItemSyncBox<Group> groupsPortion) {
380         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
381         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
382
383         for (Group group : groupsPortion.getItemsToPush()) {
384             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
385             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
386
387         }
388
389         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
390             final Group existingGroup = groupTuple.getOriginal();
391             final Group group = groupTuple.getUpdated();
392
393             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
394             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
395                     groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
396         }
397
398         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
399                 Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
400
401         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
402                 Futures.allAsList(allUpdateResults),
403                 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
404
405         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
406                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
407                 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
408
409
410         return Futures.transform(summaryResult,
411                 ReconcileUtil.chainBarrierFlush(
412                         PathUtil.digNodePath(nodeIdent), transactionService));
413     }
414
415     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
416                                                                  final InstanceIdentifier<FlowCapableNode> nodeIdent,
417                                                                  final ItemSyncBox<Meter> syncBox,
418                                                                  final SyncCrudCounters counters) {
419         if (syncBox.isEmpty()) {
420             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
421             return RpcResultBuilder.<Void>success().buildFuture();
422         }
423
424         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
425
426         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
427         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
428         for (Meter meter : syncBox.getItemsToPush()) {
429             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
430             LOG.debug("adding meter {} - absent on device {}",
431                     meter.getMeterId(), nodeId);
432             allResults.add(JdkFutureAdapters.listenInPoolThread(
433                     meterForwarder.add(meterIdent, meter, nodeIdent)));
434             meterCrudCounts.incAdded();
435         }
436
437         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
438             final Meter existingMeter = meterTuple.getOriginal();
439             final Meter updated = meterTuple.getUpdated();
440             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
441             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
442             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
443                     meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
444             meterCrudCounts.incUpdated();
445         }
446
447         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
448                 Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
449
450         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
451                 Futures.allAsList(allUpdateResults),
452                 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
453
454         final ListenableFuture<RpcResult<Void>> summaryResults = Futures.transform(
455                 Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
456                 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
457
458         return summaryResults;
459
460         /*
461         return Futures.transform(summaryResults,
462                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
463                 */
464     }
465
466     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
467                                                                  final InstanceIdentifier<FlowCapableNode> nodeIdent,
468                                                                  final List<ItemSyncBox<Group>> groupsAddPlan,
469                                                                  final SyncCrudCounters counters) {
470         if (groupsAddPlan.isEmpty()) {
471             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
472             return RpcResultBuilder.<Void>success().buildFuture();
473         }
474
475         ListenableFuture<RpcResult<Void>> chainedResult;
476         try {
477             if (!groupsAddPlan.isEmpty()) {
478                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
479                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
480                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
481
482                 if (LOG.isDebugEnabled()) {
483                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
484                             groupsAddPlan.size(),
485                             groupCrudCounts.getAdded(),
486                             groupCrudCounts.getUpdated());
487                 }
488
489                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
490                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
491                     chainedResult =
492                             Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
493                                 @Override
494                                 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
495                                         throws Exception {
496                                     final ListenableFuture<RpcResult<Void>> result;
497                                     if (input.isSuccessful()) {
498                                         result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
499                                     } else {
500                                         // pass through original unsuccessful rpcResult
501                                         result = Futures.immediateFuture(input);
502                                     }
503
504                                     return result;
505                                 }
506                             });
507                 }
508             } else {
509                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
510             }
511         } catch (IllegalStateException e) {
512             chainedResult = RpcResultBuilder.<Void>failed()
513                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
514                     .buildFuture();
515         }
516
517         return chainedResult;
518     }
519
520
521     public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
522         this.flowForwarder = flowForwarder;
523         return this;
524     }
525
526     public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
527         this.tableForwarder = tableForwarder;
528         return this;
529     }
530
531     public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
532         this.meterForwarder = meterForwarder;
533         return this;
534     }
535
536     public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
537         this.groupForwarder = groupForwarder;
538         return this;
539     }
540
541     public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
542         this.transactionService = transactionService;
543         return this;
544     }
545
546 }