FRS - sonar issues and repackage
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyIncrementalImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Map;
20 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
21 import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
22 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
23 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
24 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.common.RpcError;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Execute CRUD API for flow + group + meter involving one-by-one (incremental) strategy.
60  */
61 public class SyncPlanPushStrategyIncrementalImpl implements SyncPlanPushStrategy {
62
63     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImpl.class);
64
65     private FlowForwarder flowForwarder;
66     private MeterForwarder meterForwarder;
67     private GroupForwarder groupForwarder;
68     private TableForwarder tableForwarder;
69     private FlowCapableTransactionService transactionService;
70
71     @Override
72     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
73                                                                  final SynchronizationDiffInput diffInput,
74                                                                  final SyncCrudCounters counters) {
75         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
76         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
77
78         /* Tables - have to be pushed before groups */
79         // TODO enable table-update when ready
80         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
81
82         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
83             @Override
84             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
85                 if (!input.isSuccessful()) {
86                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
87                     //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
88                     //        Futures.asList Arrays.asList(input, output),
89                     //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
90                 }
91                 return addMissingGroups(nodeId, nodeIdent, diffInput.getGroupsToAddOrUpdate(), counters);
92             }
93         });
94         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingGroups"));
95         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
96             @Override
97             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
98                 if (!input.isSuccessful()) {
99                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
100                 }
101                 return addMissingMeters(nodeId, nodeIdent, diffInput.getMetersToAddOrUpdate(), counters);
102             }
103         });
104         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingMeters"));
105         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
106             @Override
107             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
108                 if (!input.isSuccessful()) {
109                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
110                 }
111                 return addMissingFlows(nodeId, nodeIdent, diffInput.getFlowsToAddOrUpdate(), counters);
112             }
113         });
114         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "addMissingFlows"));
115
116
117         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
118             @Override
119             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
120                 if (!input.isSuccessful()) {
121                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
122                 }
123                 return removeRedundantFlows(nodeId, nodeIdent, diffInput.getFlowsToRemove(), counters);
124             }
125         });
126         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantFlows"));
127         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
128             @Override
129             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
130                 if (!input.isSuccessful()) {
131                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
132                 }
133                 return removeRedundantMeters(nodeId, nodeIdent, diffInput.getMetersToRemove(), counters);
134             }
135         });
136         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantMeters"));
137         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
138             @Override
139             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
140                 if (!input.isSuccessful()) {
141                     //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
142                 }
143                 return removeRedundantGroups(nodeId, nodeIdent, diffInput.getGroupsToRemove(), counters);
144             }
145         });
146         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
147         return resultVehicle;
148     }
149
150
151     ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
152                                                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
153                                                       final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
154                                                       final SyncCrudCounters counters) {
155         if (flowsInTablesSyncBox.isEmpty()) {
156             LOG.trace("no tables in config for node: {} -> SKIPPING", nodeId.getValue());
157             return RpcResultBuilder.<Void>success().buildFuture();
158         }
159
160         final List<ListenableFuture<RpcResult<AddFlowOutput>>> allResults = new ArrayList<>();
161         final List<ListenableFuture<RpcResult<UpdateFlowOutput>>> allUpdateResults = new ArrayList<>();
162         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
163
164         for (Map.Entry<TableKey, ItemSyncBox<Flow>> flowsInTableBoxEntry : flowsInTablesSyncBox.entrySet()) {
165             final TableKey tableKey = flowsInTableBoxEntry.getKey();
166             final ItemSyncBox<Flow> flowSyncBox = flowsInTableBoxEntry.getValue();
167
168             final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class, tableKey);
169
170             for (final Flow flow : flowSyncBox.getItemsToPush()) {
171                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, flow.getKey());
172
173                 LOG.trace("adding flow {} in table {} - absent on device {} match{}",
174                         flow.getId(), tableKey, nodeId, flow.getMatch());
175
176                 allResults.add(JdkFutureAdapters.listenInPoolThread(
177                         flowForwarder.add(flowIdent, flow, nodeIdent)));
178                 flowCrudCounts.incAdded();
179             }
180
181             for (final ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowSyncBox.getItemsToUpdate()) {
182                 final Flow existingFlow = flowUpdate.getOriginal();
183                 final Flow updatedFlow = flowUpdate.getUpdated();
184
185                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class, updatedFlow.getKey());
186                 LOG.trace("flow {} in table {} - needs update on device {} match{}",
187                         updatedFlow.getId(), tableKey, nodeId, updatedFlow.getMatch());
188
189                 allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
190                         flowForwarder.update(flowIdent, existingFlow, updatedFlow, nodeIdent)));
191                 flowCrudCounts.incUpdated();
192             }
193         }
194
195         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
196                 Futures.allAsList(allResults),
197                 ReconcileUtil.<AddFlowOutput>createRpcResultCondenser("flow adding"));
198
199         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
200                 Futures.allAsList(allUpdateResults),
201                 ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("flow updating"));
202
203         return Futures.transform(Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
204                 ReconcileUtil.<Void>createRpcResultCondenser("flow add/update"));
205     }
206
207     ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
208                                                            final InstanceIdentifier<FlowCapableNode> nodeIdent,
209                                                            final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
210                                                            final SyncCrudCounters counters) {
211         if (removalPlan.isEmpty()) {
212             LOG.trace("no tables in operational for node: {} -> SKIPPING", nodeId.getValue());
213             return RpcResultBuilder.<Void>success().buildFuture();
214         }
215
216         final List<ListenableFuture<RpcResult<RemoveFlowOutput>>> allResults = new ArrayList<>();
217         final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
218
219         for (final Map.Entry<TableKey, ItemSyncBox<Flow>> flowsPerTable : removalPlan.entrySet()) {
220             final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
221                     nodeIdent.child(Table.class, flowsPerTable.getKey());
222
223             // loop flows on device and check if the are configured
224             for (final Flow flow : flowsPerTable.getValue().getItemsToPush()) {
225                 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
226                         tableIdent.child(Flow.class, flow.getKey());
227                 allResults.add(JdkFutureAdapters.listenInPoolThread(
228                         flowForwarder.remove(flowIdent, flow, nodeIdent)));
229                 flowCrudCounts.incRemoved();
230             }
231         }
232
233         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
234                 Futures.allAsList(allResults), ReconcileUtil.<RemoveFlowOutput>createRpcResultCondenser("flow remove"));
235         return Futures.transform(singleVoidResult,
236                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
237
238     }
239
240     ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
241                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
242                                                             final ItemSyncBox<Meter> meterRemovalPlan,
243                                                             final SyncCrudCounters counters) {
244         if (meterRemovalPlan.isEmpty()) {
245             LOG.trace("no meters on device for node: {} -> SKIPPING", nodeId.getValue());
246             return RpcResultBuilder.<Void>success().buildFuture();
247         }
248
249         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
250
251         final List<ListenableFuture<RpcResult<RemoveMeterOutput>>> allResults = new ArrayList<>();
252         for (Meter meter : meterRemovalPlan.getItemsToPush()) {
253             LOG.trace("removing meter {} - absent in config {}",
254                     meter.getMeterId(), nodeId);
255             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
256                     nodeIdent.child(Meter.class, meter.getKey());
257             allResults.add(JdkFutureAdapters.listenInPoolThread(
258                     meterForwarder.remove(meterIdent, meter, nodeIdent)));
259             meterCrudCounts.incRemoved();
260         }
261
262         return Futures.transform(Futures.allAsList(allResults),
263                 ReconcileUtil.<RemoveMeterOutput>createRpcResultCondenser("meter remove"));
264     }
265
266     ListenableFuture<RpcResult<Void>> removeRedundantGroups(final NodeId nodeId,
267                                                             final InstanceIdentifier<FlowCapableNode> nodeIdent,
268                                                             final List<ItemSyncBox<Group>> groupsRemovalPlan,
269                                                             final SyncCrudCounters counters) {
270         if (groupsRemovalPlan.isEmpty()) {
271             LOG.trace("no groups on device for node: {} -> SKIPPING", nodeId.getValue());
272             return RpcResultBuilder.<Void>success().buildFuture();
273         }
274
275         final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
276
277         ListenableFuture<RpcResult<Void>> chainedResult = RpcResultBuilder.<Void>success().buildFuture();
278         try {
279             groupCrudCounts.setRemoved(ReconcileUtil.countTotalPushed(groupsRemovalPlan));
280             if (LOG.isDebugEnabled()) {
281                 LOG.debug("removing groups: planSteps={}, toRemoveTotal={}",
282                         groupsRemovalPlan.size(), groupCrudCounts.getRemoved());
283             }
284             Collections.reverse(groupsRemovalPlan);
285             for (final ItemSyncBox<Group> groupsPortion : groupsRemovalPlan) {
286                 chainedResult =
287                         Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
288                             @Override
289                             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
290                                     throws Exception {
291                                 final ListenableFuture<RpcResult<Void>> result;
292                                 if (input.isSuccessful()) {
293                                     result = flushRemoveGroupPortionAndBarrier(nodeIdent, groupsPortion);
294                                 } else {
295                                     // pass through original unsuccessful rpcResult
296                                     result = Futures.immediateFuture(input);
297                                 }
298
299                                 return result;
300                             }
301                         });
302             }
303         } catch (IllegalStateException e) {
304             chainedResult = RpcResultBuilder.<Void>failed()
305                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
306                     .buildFuture();
307         }
308
309         return chainedResult;
310     }
311
312     private ListenableFuture<RpcResult<Void>> flushRemoveGroupPortionAndBarrier(
313             final InstanceIdentifier<FlowCapableNode> nodeIdent,
314             final ItemSyncBox<Group> groupsPortion) {
315         List<ListenableFuture<RpcResult<RemoveGroupOutput>>> allResults = new ArrayList<>();
316         for (Group group : groupsPortion.getItemsToPush()) {
317             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
318             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.remove(groupIdent, group, nodeIdent)));
319         }
320
321         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
322                 Futures.allAsList(allResults),
323                 ReconcileUtil.<RemoveGroupOutput>createRpcResultCondenser("group remove"));
324
325         return Futures.transform(singleVoidResult,
326                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
327     }
328
329     ListenableFuture<RpcResult<Void>> updateTableFeatures(final InstanceIdentifier<FlowCapableNode> nodeIdent,
330                                                           final FlowCapableNode flowCapableNodeConfigured) {
331         // CHECK if while pushing the update, updateTableInput can be null to emulate a table add
332         final List<Table> tableList = ReconcileUtil.safeTables(flowCapableNodeConfigured);
333
334         final List<ListenableFuture<RpcResult<UpdateTableOutput>>> allResults = new ArrayList<>();
335         for (Table table : tableList) {
336             TableKey tableKey = table.getKey();
337             KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdent
338                     .child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
339             List<TableFeatures> tableFeatures = flowCapableNodeConfigured.getTableFeatures();
340             if (tableFeatures != null) {
341                 for (TableFeatures tableFeaturesItem : tableFeatures) {
342                     // TODO uncomment java.lang.NullPointerException
343                     // at
344                     // org.opendaylight.openflowjava.protocol.impl.serialization.match.AbstractOxmMatchEntrySerializer.serializeHeader(AbstractOxmMatchEntrySerializer.java:31
345                     // allResults.add(JdkFutureAdapters.listenInPoolThread(
346                     // tableForwarder.update(tableFeaturesII, null, tableFeaturesItem, nodeIdent)));
347                 }
348             }
349         }
350
351         final ListenableFuture<RpcResult<Void>> singleVoidResult = Futures.transform(
352                 Futures.allAsList(allResults),
353                 ReconcileUtil.<UpdateTableOutput>createRpcResultCondenser("table update"));
354
355         return Futures.transform(singleVoidResult,
356                 ReconcileUtil.chainBarrierFlush(PathUtil.digNodePath(nodeIdent), transactionService));
357     }
358
359     private ListenableFuture<RpcResult<Void>> flushAddGroupPortionAndBarrier(
360             final InstanceIdentifier<FlowCapableNode> nodeIdent,
361             final ItemSyncBox<Group> groupsPortion) {
362         final List<ListenableFuture<RpcResult<AddGroupOutput>>> allResults = new ArrayList<>();
363         final List<ListenableFuture<RpcResult<UpdateGroupOutput>>> allUpdateResults = new ArrayList<>();
364
365         for (Group group : groupsPortion.getItemsToPush()) {
366             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
367             allResults.add(JdkFutureAdapters.listenInPoolThread(groupForwarder.add(groupIdent, group, nodeIdent)));
368
369         }
370
371         for (ItemSyncBox.ItemUpdateTuple<Group> groupTuple : groupsPortion.getItemsToUpdate()) {
372             final Group existingGroup = groupTuple.getOriginal();
373             final Group group = groupTuple.getUpdated();
374
375             final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class, group.getKey());
376             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
377                     groupForwarder.update(groupIdent, existingGroup, group, nodeIdent)));
378         }
379
380         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
381                 Futures.allAsList(allResults), ReconcileUtil.<AddGroupOutput>createRpcResultCondenser("group add"));
382
383         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
384                 Futures.allAsList(allUpdateResults),
385                 ReconcileUtil.<UpdateGroupOutput>createRpcResultCondenser("group update"));
386
387         final ListenableFuture<RpcResult<Void>> summaryResult = Futures.transform(
388                 Futures.allAsList(singleVoidAddResult, singleVoidUpdateResult),
389                 ReconcileUtil.<Void>createRpcResultCondenser("group add/update"));
390
391
392         return Futures.transform(summaryResult, ReconcileUtil.chainBarrierFlush(
393                 PathUtil.digNodePath(nodeIdent), transactionService));
394     }
395
396     ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
397                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
398                                                        final ItemSyncBox<Meter> syncBox,
399                                                        final SyncCrudCounters counters) {
400         if (syncBox.isEmpty()) {
401             LOG.trace("no meters configured for node: {} -> SKIPPING", nodeId.getValue());
402             return RpcResultBuilder.<Void>success().buildFuture();
403         }
404
405         final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
406
407         final List<ListenableFuture<RpcResult<AddMeterOutput>>> allResults = new ArrayList<>();
408         final List<ListenableFuture<RpcResult<UpdateMeterOutput>>> allUpdateResults = new ArrayList<>();
409         for (Meter meter : syncBox.getItemsToPush()) {
410             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, meter.getKey());
411             LOG.debug("adding meter {} - absent on device {}",
412                     meter.getMeterId(), nodeId);
413             allResults.add(JdkFutureAdapters.listenInPoolThread(
414                     meterForwarder.add(meterIdent, meter, nodeIdent)));
415             meterCrudCounts.incAdded();
416         }
417
418         for (ItemSyncBox.ItemUpdateTuple<Meter> meterTuple : syncBox.getItemsToUpdate()) {
419             final Meter existingMeter = meterTuple.getOriginal();
420             final Meter updated = meterTuple.getUpdated();
421             final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class, updated.getKey());
422             LOG.trace("meter {} - needs update on device {}", updated.getMeterId(), nodeId);
423             allUpdateResults.add(JdkFutureAdapters.listenInPoolThread(
424                     meterForwarder.update(meterIdent, existingMeter, updated, nodeIdent)));
425             meterCrudCounts.incUpdated();
426         }
427
428         final ListenableFuture<RpcResult<Void>> singleVoidAddResult = Futures.transform(
429                 Futures.allAsList(allResults), ReconcileUtil.<AddMeterOutput>createRpcResultCondenser("meter add"));
430
431         final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
432                 Futures.allAsList(allUpdateResults),
433                 ReconcileUtil.<UpdateMeterOutput>createRpcResultCondenser("meter update"));
434
435         return Futures.transform(Futures.allAsList(singleVoidUpdateResult, singleVoidAddResult),
436                 ReconcileUtil.<Void>createRpcResultCondenser("meter add/update"));
437     }
438
439     ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
440                                                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
441                                                        final List<ItemSyncBox<Group>> groupsAddPlan,
442                                                        final SyncCrudCounters counters) {
443         if (groupsAddPlan.isEmpty()) {
444             LOG.trace("no groups configured for node: {} -> SKIPPING", nodeId.getValue());
445             return RpcResultBuilder.<Void>success().buildFuture();
446         }
447
448         ListenableFuture<RpcResult<Void>> chainedResult;
449         try {
450             if (!groupsAddPlan.isEmpty()) {
451                 final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
452                 groupCrudCounts.setAdded(ReconcileUtil.countTotalPushed(groupsAddPlan));
453                 groupCrudCounts.setUpdated(ReconcileUtil.countTotalUpdated(groupsAddPlan));
454
455                 if (LOG.isDebugEnabled()) {
456                     LOG.debug("adding groups: planSteps={}, toAddTotal={}, toUpdateTotal={}",
457                             groupsAddPlan.size(),
458                             groupCrudCounts.getAdded(),
459                             groupCrudCounts.getUpdated());
460                 }
461
462                 chainedResult = flushAddGroupPortionAndBarrier(nodeIdent, groupsAddPlan.get(0));
463                 for (final ItemSyncBox<Group> groupsPortion : Iterables.skip(groupsAddPlan, 1)) {
464                     chainedResult =
465                             Futures.transform(chainedResult, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
466                                 @Override
467                                 public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input)
468                                         throws Exception {
469                                     final ListenableFuture<RpcResult<Void>> result;
470                                     if (input.isSuccessful()) {
471                                         result = flushAddGroupPortionAndBarrier(nodeIdent, groupsPortion);
472                                     } else {
473                                         // pass through original unsuccessful rpcResult
474                                         result = Futures.immediateFuture(input);
475                                     }
476
477                                     return result;
478                                 }
479                             });
480                 }
481             } else {
482                 chainedResult = RpcResultBuilder.<Void>success().buildFuture();
483             }
484         } catch (IllegalStateException e) {
485             chainedResult = RpcResultBuilder.<Void>failed()
486                     .withError(RpcError.ErrorType.APPLICATION, "failed to add missing groups", e)
487                     .buildFuture();
488         }
489
490         return chainedResult;
491     }
492
493
494     public SyncPlanPushStrategyIncrementalImpl setFlowForwarder(final FlowForwarder flowForwarder) {
495         this.flowForwarder = flowForwarder;
496         return this;
497     }
498
499     public SyncPlanPushStrategyIncrementalImpl setTableForwarder(final TableForwarder tableForwarder) {
500         this.tableForwarder = tableForwarder;
501         return this;
502     }
503
504     public SyncPlanPushStrategyIncrementalImpl setMeterForwarder(final MeterForwarder meterForwarder) {
505         this.meterForwarder = meterForwarder;
506         return this;
507     }
508
509     public SyncPlanPushStrategyIncrementalImpl setGroupForwarder(final GroupForwarder groupForwarder) {
510         this.groupForwarder = groupForwarder;
511         return this;
512     }
513
514     public SyncPlanPushStrategyIncrementalImpl setTransactionService(final FlowCapableTransactionService transactionService) {
515         this.transactionService = transactionService;
516         return this;
517     }
518
519 }