3c85a7d7ed36fbb1719e2dbbf7b54328654eaf97
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyFlatBatchImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.Iterators;
12 import com.google.common.collect.PeekingIterator;
13 import com.google.common.collect.Range;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map;
23 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
24 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
25 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCaseBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCase;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCaseBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCase;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCaseBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCase;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCaseBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCase;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCaseBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCase;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCaseBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCaseBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlow;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlowBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroup;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroupBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeter;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeterBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlow;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlowBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroup;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroupBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeter;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeterBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlow;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroup;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.opendaylight.yangtools.yang.common.Uint16;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 /**
89  * Execute CRUD API for flow + group + meter involving flat-batch strategy.
90  */
91 public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
92
93     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyFlatBatchImpl.class);
94
95     private SalFlatBatchService flatBatchService;
96     private TableForwarder tableForwarder;
97
98     @Override
99     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
100                                                                  final SynchronizationDiffInput diffInput,
101                                                                  final SyncCrudCounters counters) {
102         // prepare default (full) counts
103         counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
104         counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
105         counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
106
107         counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(
108                 diffInput.getFlowsToAddOrUpdate().values()));
109         counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(
110                 diffInput.getFlowsToAddOrUpdate().values()));
111         counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
112
113         counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
114         counters.getMeterCrudCounts().setUpdated(diffInput.getMetersToAddOrUpdate().getItemsToUpdate().size());
115         counters.getMeterCrudCounts().setRemoved(diffInput.getMetersToRemove().getItemsToPush().size());
116
117         /* Tables - have to be pushed before groups */
118         // TODO enable table-update when ready
119         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
120
121         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
122             final List<Batch> batchBag = new ArrayList<>();
123             int batchOrder = 0;
124
125             batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
126             batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
127             batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
128
129             batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
130             batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
131             batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
132
133             LOG.trace("Index of last batch step: {}", batchOrder);
134
135             final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
136                     .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
137                     // TODO: propagate from input
138                     .setExitOnFirstError(false)
139                     .setBatch(batchBag)
140                     .build();
141
142             final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture =
143                     flatBatchService.processFlatBatch(flatBatchInput);
144
145             if (LOG.isDebugEnabled()) {
146                 Futures.addCallback(rpcResultFuture, createCounterCallback(batchBag, batchOrder, counters),
147                     MoreExecutors.directExecutor());
148             }
149
150             return Futures.transform(rpcResultFuture,
151                     ReconcileUtil.createRpcResultToVoidFunction("flat-batch"),
152                     MoreExecutors.directExecutor());
153         }, MoreExecutors.directExecutor());
154         return resultVehicle;
155     }
156
157     private FutureCallback<RpcResult<ProcessFlatBatchOutput>> createCounterCallback(final List<Batch> inputBatchBag,
158                                                                                     final int failureIndexLimit,
159                                                                                     final SyncCrudCounters counters) {
160         return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
161             @Override
162             public void onSuccess(final RpcResult<ProcessFlatBatchOutput> result) {
163                 if (!result.isSuccessful() && result.getResult() != null
164                         && !result.getResult().getBatchFailure().isEmpty()) {
165                     Map<Range<Uint16>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
166                     decrementBatchFailuresCounters(result.getResult().getBatchFailure(), batchMap, counters);
167                 }
168             }
169
170             @Override
171             public void onFailure(final Throwable failure) {
172                 counters.resetAll();
173             }
174         };
175     }
176
177     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
178             justification = "https://github.com/spotbugs/spotbugs/issues/811")
179     private static void decrementBatchFailuresCounters(final List<BatchFailure> batchFailures,
180                                                 final Map<Range<Uint16>, Batch> batchMap,
181                                                 final SyncCrudCounters counters) {
182         for (BatchFailure batchFailure : batchFailures) {
183             for (Map.Entry<Range<Uint16>, Batch> rangeBatchEntry : batchMap.entrySet()) {
184                 if (rangeBatchEntry.getKey().contains(batchFailure.getBatchOrder())) {
185                     // get type and decrease
186                     final BatchChoice batchChoice = rangeBatchEntry.getValue().getBatchChoice();
187                     decrementCounters(batchChoice, counters);
188                     break;
189                 }
190             }
191         }
192     }
193
194     static void decrementCounters(final BatchChoice batchChoice, final SyncCrudCounters counters) {
195         if (batchChoice instanceof FlatBatchAddFlowCase) {
196             counters.getFlowCrudCounts().decAdded();
197         } else if (batchChoice instanceof FlatBatchUpdateFlowCase) {
198             counters.getFlowCrudCounts().decUpdated();
199         } else if (batchChoice instanceof FlatBatchRemoveFlowCase) {
200             counters.getFlowCrudCounts().decRemoved();
201         } else if (batchChoice instanceof FlatBatchAddGroupCase) {
202             counters.getGroupCrudCounts().decAdded();
203         } else if (batchChoice instanceof FlatBatchUpdateGroupCase) {
204             counters.getGroupCrudCounts().decUpdated();
205         } else if (batchChoice instanceof FlatBatchRemoveGroupCase) {
206             counters.getGroupCrudCounts().decRemoved();
207         } else if (batchChoice instanceof FlatBatchAddMeterCase) {
208             counters.getMeterCrudCounts().decAdded();
209         } else if (batchChoice instanceof FlatBatchUpdateMeterCase) {
210             counters.getMeterCrudCounts().decUpdated();
211         } else if (batchChoice instanceof FlatBatchRemoveMeterCase) {
212             counters.getMeterCrudCounts().decRemoved();
213         }
214     }
215
216     static Map<Range<Uint16>, Batch> mapBatchesToRanges(final List<Batch> inputBatchBag, final int failureIndexLimit) {
217         final Map<Range<Uint16>, Batch> batchMap = new LinkedHashMap<>();
218         final PeekingIterator<Batch> batchPeekingIterator = Iterators.peekingIterator(inputBatchBag.iterator());
219         while (batchPeekingIterator.hasNext()) {
220             final Batch batch = batchPeekingIterator.next();
221             final int nextBatchOrder = batchPeekingIterator.hasNext()
222                     ? batchPeekingIterator.peek().getBatchOrder().toJava()
223                     : failureIndexLimit;
224             batchMap.put(Range.closed(batch.getBatchOrder(), Uint16.valueOf(nextBatchOrder - 1)), batch);
225         }
226         return batchMap;
227     }
228
229     @VisibleForTesting
230     static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder,
231             final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
232         // process flow remove
233         int order = batchOrder;
234         if (flowItemSyncTableMap != null) {
235             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
236                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
237
238                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
239                     final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
240                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
241                     int itemOrder = 0;
242                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
243                         flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
244                                 .setBatchOrder(itemOrder++)
245                                 .setFlowId(flow.getId())
246                                 .build());
247                     }
248                     final Batch batch = new BatchBuilder()
249                             .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
250                                     .setFlatBatchRemoveFlow(flatBatchRemoveFlowBag)
251                                     .build())
252                             .setBatchOrder(order)
253                             .build();
254                     order += itemOrder;
255                     batchBag.add(batch);
256                 }
257             }
258         }
259         return order;
260     }
261
262     @VisibleForTesting
263     static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder,
264             final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
265         // process group add+update
266         int order = batchOrder;
267         if (groupsToAddOrUpdate != null) {
268             for (ItemSyncBox<Group> groupItemSyncBox : groupsToAddOrUpdate) {
269                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
270                     final List<FlatBatchAddGroup> flatBatchAddGroupBag =
271                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
272                     int itemOrder = 0;
273                     for (Group group : groupItemSyncBox.getItemsToPush()) {
274                         flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group)
275                                 .setBatchOrder(itemOrder++).build());
276                     }
277                     final Batch batch = new BatchBuilder()
278                             .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
279                                     .setFlatBatchAddGroup(flatBatchAddGroupBag)
280                                     .build())
281                             .setBatchOrder(order)
282                             .build();
283                     order += itemOrder;
284                     batchBag.add(batch);
285                 }
286
287                 if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
288                     final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
289                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
290                     int itemOrder = 0;
291                     for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
292                         flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
293                             .setBatchOrder(itemOrder++)
294                             .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
295                             .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
296                             .build());
297                     }
298                     final Batch batch = new BatchBuilder()
299                             .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
300                                     .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
301                                     .build())
302                             .setBatchOrder(order)
303                             .build();
304                     order += itemOrder;
305                     batchBag.add(batch);
306                 }
307             }
308         }
309         return order;
310     }
311
312     @VisibleForTesting
313     static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder,
314             final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
315         // process group add+update
316         int order = batchOrder;
317         if (groupsToRemoveOrUpdate != null) {
318             for (ItemSyncBox<Group> groupItemSyncBox : groupsToRemoveOrUpdate) {
319                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
320                     final List<FlatBatchRemoveGroup> flatBatchRemoveGroupBag =
321                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
322                     int itemOrder = 0;
323                     for (Group group : groupItemSyncBox.getItemsToPush()) {
324                         flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group)
325                                 .setBatchOrder(itemOrder++).build());
326                     }
327                     final Batch batch = new BatchBuilder()
328                             .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
329                                     .setFlatBatchRemoveGroup(flatBatchRemoveGroupBag)
330                                     .build())
331                             .setBatchOrder(order)
332                             .build();
333                     order += itemOrder;
334                     batchBag.add(batch);
335                 }
336             }
337         }
338         return order;
339     }
340
341     @VisibleForTesting
342     static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder,
343             final ItemSyncBox<Meter> meterItemSyncBox) {
344         // process meter add+update
345         int order = batchOrder;
346         if (meterItemSyncBox != null) {
347             if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
348                 final List<FlatBatchAddMeter> flatBatchAddMeterBag =
349                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
350                 int itemOrder = 0;
351                 for (Meter meter : meterItemSyncBox.getItemsToPush()) {
352                     flatBatchAddMeterBag.add(new FlatBatchAddMeterBuilder(meter).setBatchOrder(itemOrder++).build());
353                 }
354                 final Batch batch = new BatchBuilder()
355                         .setBatchChoice(new FlatBatchAddMeterCaseBuilder()
356                                 .setFlatBatchAddMeter(flatBatchAddMeterBag)
357                                 .build())
358                         .setBatchOrder(order)
359                         .build();
360                 order += itemOrder;
361                 batchBag.add(batch);
362             }
363
364             if (!meterItemSyncBox.getItemsToUpdate().isEmpty()) {
365                 final List<FlatBatchUpdateMeter> flatBatchUpdateMeterBag =
366                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
367                 int itemOrder = 0;
368                 for (ItemSyncBox.ItemUpdateTuple<Meter> meterUpdate : meterItemSyncBox.getItemsToUpdate()) {
369                     flatBatchUpdateMeterBag.add(new FlatBatchUpdateMeterBuilder()
370                             .setBatchOrder(itemOrder++)
371                             .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(meterUpdate.getOriginal()).build())
372                             .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(meterUpdate.getUpdated()).build())
373                             .build());
374                 }
375                 final Batch batch = new BatchBuilder()
376                         .setBatchChoice(new FlatBatchUpdateMeterCaseBuilder()
377                                 .setFlatBatchUpdateMeter(flatBatchUpdateMeterBag)
378                                 .build())
379                         .setBatchOrder(order)
380                         .build();
381                 order += itemOrder;
382                 batchBag.add(batch);
383             }
384         }
385         return order;
386     }
387
388     @VisibleForTesting
389     static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder,
390             final ItemSyncBox<Meter> meterItemSyncBox) {
391         // process meter remove
392         int order = batchOrder;
393         if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
394             final List<FlatBatchRemoveMeter> flatBatchRemoveMeterBag =
395                     new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
396             int itemOrder = 0;
397             for (Meter meter : meterItemSyncBox.getItemsToPush()) {
398                 flatBatchRemoveMeterBag.add(new FlatBatchRemoveMeterBuilder(meter).setBatchOrder(itemOrder++).build());
399             }
400             final Batch batch = new BatchBuilder()
401                     .setBatchChoice(new FlatBatchRemoveMeterCaseBuilder()
402                             .setFlatBatchRemoveMeter(flatBatchRemoveMeterBag)
403                             .build())
404                     .setBatchOrder(order)
405                     .build();
406             order += itemOrder;
407             batchBag.add(batch);
408         }
409         return order;
410     }
411
412     @VisibleForTesting
413     static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder,
414             final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
415         // process flow add+update
416         int order = batchOrder;
417         if (flowItemSyncTableMap != null) {
418             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
419                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
420
421                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
422                     final List<FlatBatchAddFlow> flatBatchAddFlowBag =
423                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
424                     int itemOrder = 0;
425                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
426                         flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
427                                 .setBatchOrder(itemOrder++)
428                                 .setFlowId(flow.getId())
429                                 .build());
430                     }
431                     final Batch batch = new BatchBuilder()
432                             .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
433                                     .setFlatBatchAddFlow(flatBatchAddFlowBag)
434                                     .build())
435                             .setBatchOrder(order)
436                             .build();
437                     order += itemOrder;
438                     batchBag.add(batch);
439                 }
440
441                 if (!flowItemSyncBox.getItemsToUpdate().isEmpty()) {
442                     final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
443                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
444                     int itemOrder = 0;
445                     for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
446                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
447                             .setBatchOrder(itemOrder++)
448                             .setFlowId(flowUpdate.getUpdated().getId())
449                             .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
450                             .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
451                             .build());
452                     }
453                     final Batch batch = new BatchBuilder()
454                             .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()
455                                     .setFlatBatchUpdateFlow(flatBatchUpdateFlowBag)
456                                     .build())
457                             .setBatchOrder(order)
458                             .build();
459                     order += itemOrder;
460                     batchBag.add(batch);
461                 }
462             }
463         }
464         return order;
465     }
466
467     public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
468         this.flatBatchService = flatBatchService;
469         return this;
470     }
471
472     public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
473         this.tableForwarder = tableForwarder;
474         return this;
475     }
476 }