Merge remote-tracking branch 'liblldp/master'
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyFlatBatchImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.collect.Iterators;
13 import com.google.common.collect.PeekingIterator;
14 import com.google.common.collect.Range;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.JdkFutureAdapters;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.util.ArrayList;
22 import java.util.LinkedHashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Future;
26 import javax.annotation.Nullable;
27 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
28 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
29 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
30 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
31 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCaseBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCase;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCaseBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCase;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCaseBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCase;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCaseBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCase;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCaseBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCase;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCaseBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCaseBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlow;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlowBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroup;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroupBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeter;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeterBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlow;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlowBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroup;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroupBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeter;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeterBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlow;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroup;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
87 import org.opendaylight.yangtools.yang.common.RpcResult;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 /**
92  * Execute CRUD API for flow + group + meter involving flat-batch strategy.
93  */
94 public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
95
96     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyFlatBatchImpl.class);
97
98     private SalFlatBatchService flatBatchService;
99     private TableForwarder tableForwarder;
100
101     @Override
102     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
103                                                                  final SynchronizationDiffInput diffInput,
104                                                                  final SyncCrudCounters counters) {
105         // prepare default (full) counts
106         counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
107         counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
108         counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
109
110         counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getFlowsToAddOrUpdate().values()));
111         counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getFlowsToAddOrUpdate().values()));
112         counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
113
114         counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
115         counters.getMeterCrudCounts().setUpdated(diffInput.getMetersToAddOrUpdate().getItemsToUpdate().size());
116         counters.getMeterCrudCounts().setRemoved(diffInput.getMetersToRemove().getItemsToPush().size());
117
118         /* Tables - have to be pushed before groups */
119         // TODO enable table-update when ready
120         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
121
122         resultVehicle = Futures.transformAsync(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
123             @Override
124             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
125                 final List<Batch> batchBag = new ArrayList<>();
126                 int batchOrder = 0;
127
128                 batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
129                 batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
130                 batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
131
132                 batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
133                 batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
134                 batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
135
136                 LOG.trace("Index of last batch step: {}", batchOrder);
137
138                 final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
139                         .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
140                         // TODO: propagate from input
141                         .setExitOnFirstError(false)
142                         .setBatch(batchBag)
143                         .build();
144
145                 final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
146
147                 if (LOG.isDebugEnabled()) {
148                     Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
149                             createCounterCallback(batchBag, batchOrder, counters), MoreExecutors.directExecutor());
150                 }
151
152                 return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
153                         ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
154             }
155         });
156         return resultVehicle;
157     }
158
159     private FutureCallback<RpcResult<ProcessFlatBatchOutput>> createCounterCallback(final List<Batch> inputBatchBag,
160                                                                                     final int failureIndexLimit,
161                                                                                     final SyncCrudCounters counters) {
162         return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
163             @Override
164             public void onSuccess(@Nullable final RpcResult<ProcessFlatBatchOutput> result) {
165                 if (!result.isSuccessful() && result.getResult() != null && !result.getResult().getBatchFailure().isEmpty()) {
166                     Map<Range<Integer>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
167                     decrementBatchFailuresCounters(result.getResult().getBatchFailure(), batchMap, counters);
168                 }
169             }
170
171             @Override
172             public void onFailure(final Throwable t) {
173                 counters.resetAll();
174             }
175         };
176     }
177
178     private static void decrementBatchFailuresCounters(final List<BatchFailure> batchFailures,
179                                                 final Map<Range<Integer>, Batch> batchMap,
180                                                 final SyncCrudCounters counters) {
181         for (BatchFailure batchFailure : batchFailures) {
182             for (Map.Entry<Range<Integer>, Batch> rangeBatchEntry : batchMap.entrySet()) {
183                 if (rangeBatchEntry.getKey().contains(batchFailure.getBatchOrder())) {
184                     // get type and decrease
185                     final BatchChoice batchChoice = rangeBatchEntry.getValue().getBatchChoice();
186                     decrementCounters(batchChoice, counters);
187                     break;
188                 }
189             }
190         }
191     }
192
193     static void decrementCounters(final BatchChoice batchChoice, final SyncCrudCounters counters) {
194         if (batchChoice instanceof FlatBatchAddFlowCase) {
195             counters.getFlowCrudCounts().decAdded();
196         } else if (batchChoice instanceof FlatBatchUpdateFlowCase) {
197             counters.getFlowCrudCounts().decUpdated();
198         } else if (batchChoice instanceof FlatBatchRemoveFlowCase) {
199             counters.getFlowCrudCounts().decRemoved();
200         } else if (batchChoice instanceof FlatBatchAddGroupCase) {
201             counters.getGroupCrudCounts().decAdded();
202         } else if (batchChoice instanceof FlatBatchUpdateGroupCase) {
203             counters.getGroupCrudCounts().decUpdated();
204         } else if (batchChoice instanceof FlatBatchRemoveGroupCase) {
205             counters.getGroupCrudCounts().decRemoved();
206         } else if (batchChoice instanceof FlatBatchAddMeterCase) {
207             counters.getMeterCrudCounts().decAdded();
208         } else if (batchChoice instanceof FlatBatchUpdateMeterCase) {
209             counters.getMeterCrudCounts().decUpdated();
210         } else if (batchChoice instanceof FlatBatchRemoveMeterCase) {
211             counters.getMeterCrudCounts().decRemoved();
212         }
213     }
214
215     static Map<Range<Integer>, Batch> mapBatchesToRanges(final List<Batch> inputBatchBag, final int failureIndexLimit) {
216         final Map<Range<Integer>, Batch> batchMap = new LinkedHashMap<>();
217         final PeekingIterator<Batch> batchPeekingIterator = Iterators.peekingIterator(inputBatchBag.iterator());
218         while (batchPeekingIterator.hasNext()) {
219             final Batch batch = batchPeekingIterator.next();
220             final int nextBatchOrder = batchPeekingIterator.hasNext()
221                     ? batchPeekingIterator.peek().getBatchOrder()
222                     : failureIndexLimit;
223             batchMap.put(Range.closed(batch.getBatchOrder(), nextBatchOrder - 1), batch);
224         }
225         return batchMap;
226     }
227
228     @VisibleForTesting
229     static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
230         // process flow remove
231         int order = batchOrder;
232         if (flowItemSyncTableMap != null) {
233             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
234                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
235
236                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
237                     final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
238                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
239                     int itemOrder = 0;
240                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
241                         flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
242                                 .setBatchOrder(itemOrder++)
243                                 .setFlowId(flow.getId())
244                                 .build());
245                     }
246                     final Batch batch = new BatchBuilder()
247                             .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
248                                     .setFlatBatchRemoveFlow(flatBatchRemoveFlowBag)
249                                     .build())
250                             .setBatchOrder(order)
251                             .build();
252                     order += itemOrder;
253                     batchBag.add(batch);
254                 }
255             }
256         }
257         return order;
258     }
259
260     @VisibleForTesting
261     static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
262         // process group add+update
263         int order = batchOrder;
264         if (groupsToAddOrUpdate != null) {
265             for (ItemSyncBox<Group> groupItemSyncBox : groupsToAddOrUpdate) {
266                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
267                     final List<FlatBatchAddGroup> flatBatchAddGroupBag =
268                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
269                     int itemOrder = 0;
270                     for (Group group : groupItemSyncBox.getItemsToPush()) {
271                         flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group).setBatchOrder(itemOrder++).build());
272                     }
273                     final Batch batch = new BatchBuilder()
274                             .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
275                                     .setFlatBatchAddGroup(flatBatchAddGroupBag)
276                                     .build())
277                             .setBatchOrder(order)
278                             .build();
279                     order += itemOrder;
280                     batchBag.add(batch);
281                 }
282
283                 if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
284                     final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
285                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
286                     int itemOrder = 0;
287                     for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
288                         flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
289                                 .setBatchOrder(itemOrder++)
290                                 .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
291                                 .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
292                                 .build());
293                     }
294                     final Batch batch = new BatchBuilder()
295                             .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
296                                     .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
297                                     .build())
298                             .setBatchOrder(order)
299                             .build();
300                     order += itemOrder;
301                     batchBag.add(batch);
302                 }
303             }
304         }
305         return order;
306     }
307
308     @VisibleForTesting
309     static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
310         // process group add+update
311         int order = batchOrder;
312         if (groupsToRemoveOrUpdate != null) {
313             for (ItemSyncBox<Group> groupItemSyncBox : groupsToRemoveOrUpdate) {
314                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
315                     final List<FlatBatchRemoveGroup> flatBatchRemoveGroupBag =
316                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
317                     int itemOrder = 0;
318                     for (Group group : groupItemSyncBox.getItemsToPush()) {
319                         flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group).setBatchOrder(itemOrder++).build());
320                     }
321                     final Batch batch = new BatchBuilder()
322                             .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
323                                     .setFlatBatchRemoveGroup(flatBatchRemoveGroupBag)
324                                     .build())
325                             .setBatchOrder(order)
326                             .build();
327                     order += itemOrder;
328                     batchBag.add(batch);
329                 }
330             }
331         }
332         return order;
333     }
334
335     @VisibleForTesting
336     static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
337         // process meter add+update
338         int order = batchOrder;
339         if (meterItemSyncBox != null) {
340             if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
341                 final List<FlatBatchAddMeter> flatBatchAddMeterBag =
342                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
343                 int itemOrder = 0;
344                 for (Meter meter : meterItemSyncBox.getItemsToPush()) {
345                     flatBatchAddMeterBag.add(new FlatBatchAddMeterBuilder(meter).setBatchOrder(itemOrder++).build());
346                 }
347                 final Batch batch = new BatchBuilder()
348                         .setBatchChoice(new FlatBatchAddMeterCaseBuilder()
349                                 .setFlatBatchAddMeter(flatBatchAddMeterBag)
350                                 .build())
351                         .setBatchOrder(order)
352                         .build();
353                 order += itemOrder;
354                 batchBag.add(batch);
355             }
356
357             if (!meterItemSyncBox.getItemsToUpdate().isEmpty()) {
358                 final List<FlatBatchUpdateMeter> flatBatchUpdateMeterBag =
359                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
360                 int itemOrder = 0;
361                 for (ItemSyncBox.ItemUpdateTuple<Meter> meterUpdate : meterItemSyncBox.getItemsToUpdate()) {
362                     flatBatchUpdateMeterBag.add(new FlatBatchUpdateMeterBuilder()
363                             .setBatchOrder(itemOrder++)
364                             .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(meterUpdate.getOriginal()).build())
365                             .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(meterUpdate.getUpdated()).build())
366                             .build());
367                 }
368                 final Batch batch = new BatchBuilder()
369                         .setBatchChoice(new FlatBatchUpdateMeterCaseBuilder()
370                                 .setFlatBatchUpdateMeter(flatBatchUpdateMeterBag)
371                                 .build())
372                         .setBatchOrder(order)
373                         .build();
374                 order += itemOrder;
375                 batchBag.add(batch);
376             }
377         }
378         return order;
379     }
380
381     @VisibleForTesting
382     static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
383         // process meter remove
384         int order = batchOrder;
385         if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
386             final List<FlatBatchRemoveMeter> flatBatchRemoveMeterBag =
387                     new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
388             int itemOrder = 0;
389             for (Meter meter : meterItemSyncBox.getItemsToPush()) {
390                 flatBatchRemoveMeterBag.add(new FlatBatchRemoveMeterBuilder(meter).setBatchOrder(itemOrder++).build());
391             }
392             final Batch batch = new BatchBuilder()
393                     .setBatchChoice(new FlatBatchRemoveMeterCaseBuilder()
394                             .setFlatBatchRemoveMeter(flatBatchRemoveMeterBag)
395                             .build())
396                     .setBatchOrder(order)
397                     .build();
398             order += itemOrder;
399             batchBag.add(batch);
400         }
401         return order;
402     }
403
404     @VisibleForTesting
405     static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
406         // process flow add+update
407         int order = batchOrder;
408         if (flowItemSyncTableMap != null) {
409             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
410                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
411
412                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
413                     final List<FlatBatchAddFlow> flatBatchAddFlowBag =
414                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
415                     int itemOrder = 0;
416                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
417                         flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
418                                 .setBatchOrder(itemOrder++)
419                                 .setFlowId(flow.getId())
420                                 .build());
421                     }
422                     final Batch batch = new BatchBuilder()
423                             .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
424                                     .setFlatBatchAddFlow(flatBatchAddFlowBag)
425                                     .build())
426                             .setBatchOrder(order)
427                             .build();
428                     order += itemOrder;
429                     batchBag.add(batch);
430                 }
431
432                 if (!flowItemSyncBox.getItemsToUpdate().isEmpty()) {
433                     final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
434                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
435                     int itemOrder = 0;
436                     for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
437                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
438                                 .setBatchOrder(itemOrder++)
439                                 .setFlowId(flowUpdate.getUpdated().getId())
440                                 .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
441                                 .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
442                                 .build());
443                     }
444                     final Batch batch = new BatchBuilder()
445                             .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()
446                                     .setFlatBatchUpdateFlow(flatBatchUpdateFlowBag)
447                                     .build())
448                             .setBatchOrder(order)
449                             .build();
450                     order += itemOrder;
451                     batchBag.add(batch);
452                 }
453             }
454         }
455         return order;
456     }
457
458     public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
459         this.flatBatchService = flatBatchService;
460         return this;
461     }
462
463     public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
464         this.tableForwarder = tableForwarder;
465         return this;
466     }
467 }