Update MRI projects for Aluminium
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyFlatBatchImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.Iterators;
12 import com.google.common.collect.PeekingIterator;
13 import com.google.common.collect.Range;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
26 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
28 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCaseBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCase;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCaseBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCase;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCaseBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCase;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCaseBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCase;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCaseBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCase;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCaseBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCaseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCaseBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlow;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlowBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroup;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroupBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeter;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeterBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlow;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlowBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroup;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroupBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeter;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeterBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlow;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroup;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
84 import org.opendaylight.yangtools.yang.common.RpcResult;
85 import org.opendaylight.yangtools.yang.common.Uint16;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89 /**
90  * Execute CRUD API for flow + group + meter involving flat-batch strategy.
91  */
92 public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
93
94     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyFlatBatchImpl.class);
95
96     private SalFlatBatchService flatBatchService;
97     private TableForwarder tableForwarder;
98
99     @Override
100     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
101                                                                  final SynchronizationDiffInput diffInput,
102                                                                  final SyncCrudCounters counters) {
103         // prepare default (full) counts
104         counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
105         counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
106         counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
107
108         counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(
109                 diffInput.getFlowsToAddOrUpdate().values()));
110         counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(
111                 diffInput.getFlowsToAddOrUpdate().values()));
112         counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
113
114         counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
115         counters.getMeterCrudCounts().setUpdated(diffInput.getMetersToAddOrUpdate().getItemsToUpdate().size());
116         counters.getMeterCrudCounts().setRemoved(diffInput.getMetersToRemove().getItemsToPush().size());
117
118         /* Tables - have to be pushed before groups */
119         // TODO enable table-update when ready
120         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
121
122         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
123             final List<Batch> batchBag = new ArrayList<>();
124             int batchOrder = 0;
125
126             batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
127             batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
128             batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
129
130             batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
131             batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
132             batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
133
134             LOG.trace("Index of last batch step: {}", batchOrder);
135
136             final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
137                     .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
138                     // TODO: propagate from input
139                     .setExitOnFirstError(false)
140                     .setBatch(batchBag)
141                     .build();
142
143             final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture =
144                     flatBatchService.processFlatBatch(flatBatchInput);
145
146             if (LOG.isDebugEnabled()) {
147                 Futures.addCallback(rpcResultFuture, createCounterCallback(batchBag, batchOrder, counters),
148                     MoreExecutors.directExecutor());
149             }
150
151             return Futures.transform(rpcResultFuture,
152                     ReconcileUtil.createRpcResultToVoidFunction("flat-batch"),
153                     MoreExecutors.directExecutor());
154         }, MoreExecutors.directExecutor());
155         return resultVehicle;
156     }
157
158     private FutureCallback<RpcResult<ProcessFlatBatchOutput>> createCounterCallback(final List<Batch> inputBatchBag,
159                                                                                     final int failureIndexLimit,
160                                                                                     final SyncCrudCounters counters) {
161         return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
162             @Override
163             public void onSuccess(final RpcResult<ProcessFlatBatchOutput> result) {
164                 if (!result.isSuccessful() && result.getResult() != null
165                         && !result.getResult().getBatchFailure().isEmpty()) {
166                     Map<Range<Uint16>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
167                     decrementBatchFailuresCounters(result.getResult().nonnullBatchFailure().values(), batchMap,
168                             counters);
169                 }
170             }
171
172             @Override
173             public void onFailure(final Throwable failure) {
174                 counters.resetAll();
175             }
176         };
177     }
178
179     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
180             justification = "https://github.com/spotbugs/spotbugs/issues/811")
181     private static void decrementBatchFailuresCounters(final Collection<BatchFailure> batchFailures,
182                                                 final Map<Range<Uint16>, Batch> batchMap,
183                                                 final SyncCrudCounters counters) {
184         for (BatchFailure batchFailure : batchFailures) {
185             for (Map.Entry<Range<Uint16>, Batch> rangeBatchEntry : batchMap.entrySet()) {
186                 if (rangeBatchEntry.getKey().contains(batchFailure.getBatchOrder())) {
187                     // get type and decrease
188                     final BatchChoice batchChoice = rangeBatchEntry.getValue().getBatchChoice();
189                     decrementCounters(batchChoice, counters);
190                     break;
191                 }
192             }
193         }
194     }
195
196     static void decrementCounters(final BatchChoice batchChoice, final SyncCrudCounters counters) {
197         if (batchChoice instanceof FlatBatchAddFlowCase) {
198             counters.getFlowCrudCounts().decAdded();
199         } else if (batchChoice instanceof FlatBatchUpdateFlowCase) {
200             counters.getFlowCrudCounts().decUpdated();
201         } else if (batchChoice instanceof FlatBatchRemoveFlowCase) {
202             counters.getFlowCrudCounts().decRemoved();
203         } else if (batchChoice instanceof FlatBatchAddGroupCase) {
204             counters.getGroupCrudCounts().decAdded();
205         } else if (batchChoice instanceof FlatBatchUpdateGroupCase) {
206             counters.getGroupCrudCounts().decUpdated();
207         } else if (batchChoice instanceof FlatBatchRemoveGroupCase) {
208             counters.getGroupCrudCounts().decRemoved();
209         } else if (batchChoice instanceof FlatBatchAddMeterCase) {
210             counters.getMeterCrudCounts().decAdded();
211         } else if (batchChoice instanceof FlatBatchUpdateMeterCase) {
212             counters.getMeterCrudCounts().decUpdated();
213         } else if (batchChoice instanceof FlatBatchRemoveMeterCase) {
214             counters.getMeterCrudCounts().decRemoved();
215         }
216     }
217
218     static Map<Range<Uint16>, Batch> mapBatchesToRanges(final List<Batch> inputBatchBag, final int failureIndexLimit) {
219         final Map<Range<Uint16>, Batch> batchMap = new LinkedHashMap<>();
220         final PeekingIterator<Batch> batchPeekingIterator = Iterators.peekingIterator(inputBatchBag.iterator());
221         while (batchPeekingIterator.hasNext()) {
222             final Batch batch = batchPeekingIterator.next();
223             final int nextBatchOrder = batchPeekingIterator.hasNext()
224                     ? batchPeekingIterator.peek().getBatchOrder().toJava()
225                     : failureIndexLimit;
226             batchMap.put(Range.closed(batch.getBatchOrder(), Uint16.valueOf(nextBatchOrder - 1)), batch);
227         }
228         return batchMap;
229     }
230
231     @VisibleForTesting
232     static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder,
233             final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
234         // process flow remove
235         int order = batchOrder;
236         if (flowItemSyncTableMap != null) {
237             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
238                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
239
240                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
241                     final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
242                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
243                     int itemOrder = 0;
244                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
245                         flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
246                                 .setBatchOrder(itemOrder++)
247                                 .setFlowId(flow.getId())
248                                 .build());
249                     }
250                     final Batch batch = new BatchBuilder()
251                             .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
252                                     .setFlatBatchRemoveFlow(flatBatchRemoveFlowBag)
253                                     .build())
254                             .setBatchOrder(order)
255                             .build();
256                     order += itemOrder;
257                     batchBag.add(batch);
258                 }
259             }
260         }
261         return order;
262     }
263
264     @VisibleForTesting
265     static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder,
266             final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
267         // process group add+update
268         int order = batchOrder;
269         if (groupsToAddOrUpdate != null) {
270             for (ItemSyncBox<Group> groupItemSyncBox : groupsToAddOrUpdate) {
271                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
272                     final List<FlatBatchAddGroup> flatBatchAddGroupBag =
273                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
274                     int itemOrder = 0;
275                     for (Group group : groupItemSyncBox.getItemsToPush()) {
276                         flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group)
277                                 .setBatchOrder(itemOrder++).build());
278                     }
279                     final Batch batch = new BatchBuilder()
280                             .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
281                                     .setFlatBatchAddGroup(flatBatchAddGroupBag)
282                                     .build())
283                             .setBatchOrder(order)
284                             .build();
285                     order += itemOrder;
286                     batchBag.add(batch);
287                 }
288
289                 if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
290                     final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
291                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
292                     int itemOrder = 0;
293                     for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
294                         flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
295                             .setBatchOrder(itemOrder++)
296                             .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
297                             .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
298                             .build());
299                     }
300                     final Batch batch = new BatchBuilder()
301                             .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
302                                     .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
303                                     .build())
304                             .setBatchOrder(order)
305                             .build();
306                     order += itemOrder;
307                     batchBag.add(batch);
308                 }
309             }
310         }
311         return order;
312     }
313
314     @VisibleForTesting
315     static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder,
316             final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
317         // process group add+update
318         int order = batchOrder;
319         if (groupsToRemoveOrUpdate != null) {
320             for (ItemSyncBox<Group> groupItemSyncBox : groupsToRemoveOrUpdate) {
321                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
322                     final List<FlatBatchRemoveGroup> flatBatchRemoveGroupBag =
323                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
324                     int itemOrder = 0;
325                     for (Group group : groupItemSyncBox.getItemsToPush()) {
326                         flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group)
327                                 .setBatchOrder(itemOrder++).build());
328                     }
329                     final Batch batch = new BatchBuilder()
330                             .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
331                                     .setFlatBatchRemoveGroup(flatBatchRemoveGroupBag)
332                                     .build())
333                             .setBatchOrder(order)
334                             .build();
335                     order += itemOrder;
336                     batchBag.add(batch);
337                 }
338             }
339         }
340         return order;
341     }
342
343     @VisibleForTesting
344     static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder,
345             final ItemSyncBox<Meter> meterItemSyncBox) {
346         // process meter add+update
347         int order = batchOrder;
348         if (meterItemSyncBox != null) {
349             if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
350                 final List<FlatBatchAddMeter> flatBatchAddMeterBag =
351                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
352                 int itemOrder = 0;
353                 for (Meter meter : meterItemSyncBox.getItemsToPush()) {
354                     flatBatchAddMeterBag.add(new FlatBatchAddMeterBuilder(meter).setBatchOrder(itemOrder++).build());
355                 }
356                 final Batch batch = new BatchBuilder()
357                         .setBatchChoice(new FlatBatchAddMeterCaseBuilder()
358                                 .setFlatBatchAddMeter(flatBatchAddMeterBag)
359                                 .build())
360                         .setBatchOrder(order)
361                         .build();
362                 order += itemOrder;
363                 batchBag.add(batch);
364             }
365
366             if (!meterItemSyncBox.getItemsToUpdate().isEmpty()) {
367                 final List<FlatBatchUpdateMeter> flatBatchUpdateMeterBag =
368                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
369                 int itemOrder = 0;
370                 for (ItemSyncBox.ItemUpdateTuple<Meter> meterUpdate : meterItemSyncBox.getItemsToUpdate()) {
371                     flatBatchUpdateMeterBag.add(new FlatBatchUpdateMeterBuilder()
372                             .setBatchOrder(itemOrder++)
373                             .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(meterUpdate.getOriginal()).build())
374                             .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(meterUpdate.getUpdated()).build())
375                             .build());
376                 }
377                 final Batch batch = new BatchBuilder()
378                         .setBatchChoice(new FlatBatchUpdateMeterCaseBuilder()
379                                 .setFlatBatchUpdateMeter(flatBatchUpdateMeterBag)
380                                 .build())
381                         .setBatchOrder(order)
382                         .build();
383                 order += itemOrder;
384                 batchBag.add(batch);
385             }
386         }
387         return order;
388     }
389
390     @VisibleForTesting
391     static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder,
392             final ItemSyncBox<Meter> meterItemSyncBox) {
393         // process meter remove
394         int order = batchOrder;
395         if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
396             final List<FlatBatchRemoveMeter> flatBatchRemoveMeterBag =
397                     new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
398             int itemOrder = 0;
399             for (Meter meter : meterItemSyncBox.getItemsToPush()) {
400                 flatBatchRemoveMeterBag.add(new FlatBatchRemoveMeterBuilder(meter).setBatchOrder(itemOrder++).build());
401             }
402             final Batch batch = new BatchBuilder()
403                     .setBatchChoice(new FlatBatchRemoveMeterCaseBuilder()
404                             .setFlatBatchRemoveMeter(flatBatchRemoveMeterBag)
405                             .build())
406                     .setBatchOrder(order)
407                     .build();
408             order += itemOrder;
409             batchBag.add(batch);
410         }
411         return order;
412     }
413
414     @VisibleForTesting
415     static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder,
416             final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
417         // process flow add+update
418         int order = batchOrder;
419         if (flowItemSyncTableMap != null) {
420             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
421                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
422
423                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
424                     final List<FlatBatchAddFlow> flatBatchAddFlowBag =
425                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
426                     int itemOrder = 0;
427                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
428                         flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
429                                 .setBatchOrder(itemOrder++)
430                                 .setFlowId(flow.getId())
431                                 .build());
432                     }
433                     final Batch batch = new BatchBuilder()
434                             .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
435                                     .setFlatBatchAddFlow(flatBatchAddFlowBag)
436                                     .build())
437                             .setBatchOrder(order)
438                             .build();
439                     order += itemOrder;
440                     batchBag.add(batch);
441                 }
442
443                 if (!flowItemSyncBox.getItemsToUpdate().isEmpty()) {
444                     final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
445                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
446                     int itemOrder = 0;
447                     for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
448                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
449                             .setBatchOrder(itemOrder++)
450                             .setFlowId(flowUpdate.getUpdated().getId())
451                             .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
452                             .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
453                             .build());
454                     }
455                     final Batch batch = new BatchBuilder()
456                             .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()
457                                     .setFlatBatchUpdateFlow(flatBatchUpdateFlowBag)
458                                     .build())
459                             .setBatchOrder(order)
460                             .build();
461                     order += itemOrder;
462                     batchBag.add(batch);
463                 }
464             }
465         }
466         return order;
467     }
468
469     public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
470         this.flatBatchService = flatBatchService;
471         return this;
472     }
473
474     public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
475         this.tableForwarder = tableForwarder;
476         return this;
477     }
478 }