40265410b004559fe6cebef535e39ca3f8552b06
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Map;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
22 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
24 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.common.RpcError;
51 import org.opendaylight.yangtools.yang.common.RpcResult;
52 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
58  */
59 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
60
61     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
62
63     private FlowForwarder flowForwarder;
64     private MeterForwarder meterForwarder;
65     private GroupForwarder groupForwarder;
66     private TableForwarder tableForwarder;
67     private FlowCapableTransactionService transactionService;
68
69     @Override
70     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
71                                                                  final SynchronizationDiffInput diffInput,
72                                                                  final SyncCrudCounters counters) {
73         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
74         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
75
76         /* Tables - have to be pushed before groups */
77         // TODO enable table-update when ready
78         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
79
80         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
81             if (!input.isSuccessful()) {
82                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
83                 //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
84                 //        Futures.asList Arrays.asList(input, output),
85                 //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
86             }
87             return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
88         }, MoreExecutors.directExecutor());
89         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"),
90                 MoreExecutors.directExecutor());
91         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
92             if (!input.isSuccessful()) {
93                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
94             }
95             return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
96         }, MoreExecutors.directExecutor());
97         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"),
98                 MoreExecutors.directExecutor());
99         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
100             if (!input.isSuccessful()) {
101                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
102             }
103             return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
104         }, MoreExecutors.directExecutor());
105         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"),
106                 MoreExecutors.directExecutor());
107
108
109         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
110             if (!input.isSuccessful()) {
111                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
112             }
113             return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
114         }, MoreExecutors.directExecutor());
115         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"),
116                 MoreExecutors.directExecutor());
117         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
118             if (!input.isSuccessful()) {
119                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
120             }
121             return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
122         }, MoreExecutors.directExecutor());
123         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"),
124                 MoreExecutors.directExecutor());
125         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
126             if (!input.isSuccessful()) {
127                 //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
128             }
129             return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
130         }, MoreExecutors.directExecutor());
131         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"),
132                 MoreExecutors.directExecutor());
133         return resultVehicle;
134     }
135
136
137     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
138                                                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
139                                                       final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
140                                                       final SyncCrudCounters counters) {
141         if (flowsInTablesSyncBox.isEmpty()) {
142             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
143             return RpcResultBuilder.<Void>success().buildFuture();
144         }
145
146         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
147         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
148         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
149
150         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
151             final TableKey tableKey = flowsInTableBoxEntry.getKey();
152             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
153
154             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
155
156             for (final Flow flow : flowSyncBox.getItemsToPush()) {
157                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
158
159                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
160                         flow.getId(), tableKey, nodeId, flow.getMatch());
161
162                 allResults.add(JdkFutureAdapters.listenInPoolThread(
163                         flowForwarder.add(flowIdent, flow, nodeIdent)));
164                 flowCrudCounts.incAdded();
165             }
166
167             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
168                 final Flow existingFlow = flowUpdate.getOriginal();
169                 final Flow updatedFlow = flowUpdate.getUpdated();
170
171                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
172                         updatedFlow.getKey());
173                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
174                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
175
176                 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
177                         flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
178                 flowCrudCounts.incUpdated();
179             }
180         }
181
182         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
183                 Futures.allAsList(allResults),
184                 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"),
185                 MoreExecutors.directExecutor());
186
187         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
188                 Futures.allAsList(allUpdateResults),
189                 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"),
190                 MoreExecutors.directExecutor());
191
192         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
193                 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"),
194                 MoreExecutors.directExecutor());
195     }
196
197     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
198                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
199                                                            final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
200                                                            final SyncCrudCounters counters) {
201         if (removalPlan.isEmpty()) {
202             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
203             return RpcResultBuilder.<Void>success().buildFuture();
204         }
205
206         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
207         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
208
209         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
210             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
211                     nodeIdent.child(Table.class, flowsPerTable.getKey());
212
213             // loop flows on device and check if the are configured
214             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
215                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
216                         tableIdent.child(Flow.class, flow.getKey());
217                 allResults.add(JdkFutureAdapters.listenInPoolThread(
218                         flowForwarder.remove(flowIdent, flow, nodeIdent)));
219                 flowCrudCounts.incRemoved();
220             }
221         }
222
223         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
224                 Futures.allAsList(allResults),
225                 ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"),
226                 MoreExecutors.directExecutor());
227
228         return Futures.transformAsync(singleVoidResult,
229                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
230                 MoreExecutors.directExecutor());
231
232     }
233
234     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
235                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
236                                                             final ItemSyncBox<Meter> meterRemovalPlan,
237                                                             final SyncCrudCounters counters) {
238         if (meterRemovalPlan.isEmpty()) {
239             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
240             return RpcResultBuilder.<Void>success().buildFuture();
241         }
242
243         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
244
245         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
246         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
247             LOG.trace("removing meter {} - absent in config {}",
248                     meter.getMeterId(), nodeId);
249             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
250                     nodeIdent.child(Meter.class, meter.getKey());
251             allResults.add(JdkFutureAdapters.listenInPoolThread(
252                     meterForwarder.remove(meterIdent, meter, nodeIdent)));
253             meterCrudCounts.incRemoved();
254         }
255
256         return Futures.transform(Futures.allAsList(allResults),
257                 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"),
258                 MoreExecutors.directExecutor());
259     }
260
261     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
262                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
263                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
264                                                             final SyncCrudCounters counters) {
265         if (groupsRemovalPlan.isEmpty()) {
266             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
267             return RpcResultBuilder.<Void>success().buildFuture();
268         }
269
270         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
271
272         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
273         try {
274             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
275             if (LOG.isDebugEnabled()) {
276                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
277                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
278             }
279             Collections.reverse(groupsRemovalPlan);
280             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
281                 chainedResult = Futures.transformAsync(chainedResult, input -> {
282                     final ListenableFuture<RpcResult<Void>> result;
283                     if (input.isSuccessful()) {
284                         result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
285                     } else {
286                         // pass through original unsuccessful rpcResult
287                         result = Futures.immediateFuture(input);
288                     }
289
290                     return result;
291                 }, MoreExecutors.directExecutor());
292             }
293         } catch (IllegalStateException e) {
294             chainedResult = RpcResultBuilder.<Void>failed()
295                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
296                     .buildFuture();
297         }
298
299         return chainedResult;
300     }
301
302     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
303             final InstanceIdentifier<FlowCapableNode> nodeIdent,
304             final ItemSyncBox<Group> groupsPortion) {
305         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
306         for (Group group : groupsPortion.getItemsToPush()) {
307             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
308             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
309         }
310
311         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
312                 Futures.allAsList(allResults),
313                 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"),
314                 MoreExecutors.directExecutor());
315
316         return Futures.transformAsync(singleVoidResult,
317                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
318                 MoreExecutors.directExecutor());
319     }
320
321     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
322                                                           final FlowCapableNode flowCapableNodeConfigured) {
323         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
324         //final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
325
326         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
327 //        for (Table table : tableList) {
328 //            List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
329 //            if (tableFeatures != null) {
330 //                for (TableFeatures tableFeaturesItem : tableFeatures) {
331 //                    // TODO uncomment java.lang.NullPointerException
332 //                    // at
333 //                    // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer
334 //                    //    .serializeHeader(AbstractOxmMatchEntrySerializer.java:31
335 //                    // allResults.add(JdkFutureAdapters.listenInPoolThread(
336 //                    // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
337 //                }
338 //            }
339 //        }
340
341         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
342                 Futures.allAsList(allResults),
343                 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"),
344                 MoreExecutors.directExecutor());
345
346         return Futures.transformAsync(singleVoidResult,
347                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService),
348                 MoreExecutors.directExecutor());
349     }
350
351     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
352             final InstanceIdentifier<FlowCapableNode> nodeIdent,
353             final ItemSyncBox<Group> groupsPortion) {
354         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
355         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
356
357         for (Group group : groupsPortion.getItemsToPush()) {
358             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
359             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
360
361         }
362
363         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
364             final Group existingGroup = groupTuple.getOriginal();
365             final Group group = groupTuple.getUpdated();
366
367             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
368             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
369                     groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
370         }
371
372         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
373                 Futures.allAsList(allResults),
374                 ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"),
375                 MoreExecutors.directExecutor());
376
377         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
378                 Futures.allAsList(allUpdateResults),
379                 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"),
380                 MoreExecutors.directExecutor());
381
382         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
383                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
384                 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"),
385                 MoreExecutors.directExecutor());
386
387
388         return Futures.transformAsync(summaryResult, ReconcileUtil.chainBarrierFlush(
389                 PathUtil.digNodePath(nodeIdent), transactionService), MoreExecutors.directExecutor());
390     }
391
392     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
393                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
394                                                        final ItemSyncBox<Meter> syncBox,
395                                                        final SyncCrudCounters counters) {
396         if (syncBox.isEmpty()) {
397             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
398             return RpcResultBuilder.<Void>success().buildFuture();
399         }
400
401         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
402
403         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
404         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
405         for (Meter meter : syncBox.getItemsToPush()) {
406             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
407             LOG.debug("adding meter {} - absent on device {}",
408                     meter.getMeterId(), nodeId);
409             allResults.add(JdkFutureAdapters.listenInPoolThread(
410                     meterForwarder.add(meterIdent, meter, nodeIdent)));
411             meterCrudCounts.incAdded();
412         }
413
414         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
415             final Meter existingMeter = meterTuple.getOriginal();
416             final Meter updated = meterTuple.getUpdated();
417             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
418             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
419             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
420                     meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
421             meterCrudCounts.incUpdated();
422         }
423
424         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
425                 Futures.allAsList(allResults),
426                 ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"),
427                 MoreExecutors.directExecutor());
428
429         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
430                 Futures.allAsList(allUpdateResults),
431                 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"),
432                 MoreExecutors.directExecutor());
433
434         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
435                 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"),
436                 MoreExecutors.directExecutor());
437     }
438
439     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
440                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
441                                                        final List<ItemSyncBox<Group>> groupsAddPlan,
442                                                        final SyncCrudCounters counters) {
443         if (groupsAddPlan.isEmpty()) {
444             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
445             return RpcResultBuilder.<Void>success().buildFuture();
446         }
447
448         ListenableFuture<RpcResult<Void>> chainedResult;
449         try {
450             if (!groupsAddPlan.isEmpty()) {
451                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
452                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
453                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
454
455                 if (LOG.isDebugEnabled()) {
456                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
457                             groupsAddPlan.size(),
458                             groupCrudCounts.getAdded(),
459                             groupCrudCounts.getUpdated());
460                 }
461
462                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
463                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
464                     chainedResult =
465                         Futures.transformAsync(chainedResult, input -> {
466                             final ListenableFuture<RpcResult<Void>> result;
467                             if (input.isSuccessful()) {
468                                 result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
469                             } else {
470                                 // pass through original unsuccessful rpcResult
471                                 result = Futures.immediateFuture(input);
472                             }
473
474                             return result;
475                         }, MoreExecutors.directExecutor());
476                 }
477             } else {
478                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
479             }
480         } catch (IllegalStateException e) {
481             chainedResult = RpcResultBuilder.<Void>failed()
482                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
483                     .buildFuture();
484         }
485
486         return chainedResult;
487     }
488
489
490     public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
491         this.flowForwarder = flowForwarder;
492         return this;
493     }
494
495     public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
496         this.tableForwarder = tableForwarder;
497         return this;
498     }
499
500     public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
501         this.meterForwarder = meterForwarder;
502         return this;
503     }
504
505     public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
506         this.groupForwarder = groupForwarder;
507         return this;
508     }
509
510     public SyncPlanPushStrategyIncrementalImpl setTransactionService(
511             final FlowCapableTransactionService transactionService) {
512         this.transactionService = transactionService;
513         return this;
514     }
515
516 }