Reduce use of JdkFutureAdapters
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyFlatBatchImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.collect.Iterators;
13 import com.google.common.collect.PeekingIterator;
14 import com.google.common.collect.Range;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map;
23 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
24 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
25 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCaseBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCase;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCaseBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCase;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCaseBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCase;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCaseBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCase;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCaseBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCase;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCaseBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCaseBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlow;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlowBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroup;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroupBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeter;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeterBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlow;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlowBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroup;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroupBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeter;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeterBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlow;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroup;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 /**
88  * Execute CRUD API for flow + group + meter involving flat-batch strategy.
89  */
90 public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
91
92     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyFlatBatchImpl.class);
93
94     private SalFlatBatchService flatBatchService;
95     private TableForwarder tableForwarder;
96
97     @Override
98     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
99                                                                  final SynchronizationDiffInput diffInput,
100                                                                  final SyncCrudCounters counters) {
101         // prepare default (full) counts
102         counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
103         counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
104         counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
105
106         counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(
107                 diffInput.getFlowsToAddOrUpdate().values()));
108         counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(
109                 diffInput.getFlowsToAddOrUpdate().values()));
110         counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
111
112         counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
113         counters.getMeterCrudCounts().setUpdated(diffInput.getMetersToAddOrUpdate().getItemsToUpdate().size());
114         counters.getMeterCrudCounts().setRemoved(diffInput.getMetersToRemove().getItemsToPush().size());
115
116         /* Tables - have to be pushed before groups */
117         // TODO enable table-update when ready
118         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
119
120         resultVehicle = Futures.transformAsync(resultVehicle, input -> {
121             final List<Batch> batchBag = new ArrayList<>();
122             int batchOrder = 0;
123
124             batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
125             batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
126             batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
127
128             batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
129             batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
130             batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
131
132             LOG.trace("Index of last batch step: {}", batchOrder);
133
134             final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
135                     .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
136                     // TODO: propagate from input
137                     .setExitOnFirstError(false)
138                     .setBatch(batchBag)
139                     .build();
140
141             final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture =
142                     flatBatchService.processFlatBatch(flatBatchInput);
143
144             if (LOG.isDebugEnabled()) {
145                 Futures.addCallback(rpcResultFuture, createCounterCallback(batchBag, batchOrder, counters),
146                     MoreExecutors.directExecutor());
147             }
148
149             return Futures.transform(rpcResultFuture,
150                     ReconcileUtil.createRpcResultToVoidFunction("flat-batch"),
151                     MoreExecutors.directExecutor());
152         }, MoreExecutors.directExecutor());
153         return resultVehicle;
154     }
155
156     private FutureCallback<RpcResult<ProcessFlatBatchOutput>> createCounterCallback(final List<Batch> inputBatchBag,
157                                                                                     final int failureIndexLimit,
158                                                                                     final SyncCrudCounters counters) {
159         return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
160             @Override
161             public void onSuccess(final RpcResult<ProcessFlatBatchOutput> result) {
162                 if (!result.isSuccessful() && result.getResult() != null
163                         && !result.getResult().getBatchFailure().isEmpty()) {
164                     Map<Range<Integer>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
165                     decrementBatchFailuresCounters(result.getResult().getBatchFailure(), batchMap, counters);
166                 }
167             }
168
169             @Override
170             public void onFailure(final Throwable failure) {
171                 counters.resetAll();
172             }
173         };
174     }
175
176     private static void decrementBatchFailuresCounters(final List<BatchFailure> batchFailures,
177                                                 final Map<Range<Integer>, Batch> batchMap,
178                                                 final SyncCrudCounters counters) {
179         for (BatchFailure batchFailure : batchFailures) {
180             for (Map.Entry<Range<Integer>, Batch> rangeBatchEntry : batchMap.entrySet()) {
181                 if (rangeBatchEntry.getKey().contains(batchFailure.getBatchOrder())) {
182                     // get type and decrease
183                     final BatchChoice batchChoice = rangeBatchEntry.getValue().getBatchChoice();
184                     decrementCounters(batchChoice, counters);
185                     break;
186                 }
187             }
188         }
189     }
190
191     static void decrementCounters(final BatchChoice batchChoice, final SyncCrudCounters counters) {
192         if (batchChoice instanceof FlatBatchAddFlowCase) {
193             counters.getFlowCrudCounts().decAdded();
194         } else if (batchChoice instanceof FlatBatchUpdateFlowCase) {
195             counters.getFlowCrudCounts().decUpdated();
196         } else if (batchChoice instanceof FlatBatchRemoveFlowCase) {
197             counters.getFlowCrudCounts().decRemoved();
198         } else if (batchChoice instanceof FlatBatchAddGroupCase) {
199             counters.getGroupCrudCounts().decAdded();
200         } else if (batchChoice instanceof FlatBatchUpdateGroupCase) {
201             counters.getGroupCrudCounts().decUpdated();
202         } else if (batchChoice instanceof FlatBatchRemoveGroupCase) {
203             counters.getGroupCrudCounts().decRemoved();
204         } else if (batchChoice instanceof FlatBatchAddMeterCase) {
205             counters.getMeterCrudCounts().decAdded();
206         } else if (batchChoice instanceof FlatBatchUpdateMeterCase) {
207             counters.getMeterCrudCounts().decUpdated();
208         } else if (batchChoice instanceof FlatBatchRemoveMeterCase) {
209             counters.getMeterCrudCounts().decRemoved();
210         }
211     }
212
213     static Map<Range<Integer>, Batch> mapBatchesToRanges(final List<Batch> inputBatchBag, final int failureIndexLimit) {
214         final Map<Range<Integer>, Batch> batchMap = new LinkedHashMap<>();
215         final PeekingIterator<Batch> batchPeekingIterator = Iterators.peekingIterator(inputBatchBag.iterator());
216         while (batchPeekingIterator.hasNext()) {
217             final Batch batch = batchPeekingIterator.next();
218             final int nextBatchOrder = batchPeekingIterator.hasNext()
219                     ? batchPeekingIterator.peek().getBatchOrder()
220                     : failureIndexLimit;
221             batchMap.put(Range.closed(batch.getBatchOrder(), nextBatchOrder - 1), batch);
222         }
223         return batchMap;
224     }
225
226     @VisibleForTesting
227     static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder,
228             final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
229         // process flow remove
230         int order = batchOrder;
231         if (flowItemSyncTableMap != null) {
232             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
233                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
234
235                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
236                     final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
237                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
238                     int itemOrder = 0;
239                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
240                         flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
241                                 .setBatchOrder(itemOrder++)
242                                 .setFlowId(flow.getId())
243                                 .build());
244                     }
245                     final Batch batch = new BatchBuilder()
246                             .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
247                                     .setFlatBatchRemoveFlow(flatBatchRemoveFlowBag)
248                                     .build())
249                             .setBatchOrder(order)
250                             .build();
251                     order += itemOrder;
252                     batchBag.add(batch);
253                 }
254             }
255         }
256         return order;
257     }
258
259     @VisibleForTesting
260     static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder,
261             final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
262         // process group add+update
263         int order = batchOrder;
264         if (groupsToAddOrUpdate != null) {
265             for (ItemSyncBox<Group> groupItemSyncBox : groupsToAddOrUpdate) {
266                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
267                     final List<FlatBatchAddGroup> flatBatchAddGroupBag =
268                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
269                     int itemOrder = 0;
270                     for (Group group : groupItemSyncBox.getItemsToPush()) {
271                         flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group)
272                                 .setBatchOrder(itemOrder++).build());
273                     }
274                     final Batch batch = new BatchBuilder()
275                             .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
276                                     .setFlatBatchAddGroup(flatBatchAddGroupBag)
277                                     .build())
278                             .setBatchOrder(order)
279                             .build();
280                     order += itemOrder;
281                     batchBag.add(batch);
282                 }
283
284                 if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
285                     final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
286                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
287                     int itemOrder = 0;
288                     for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
289                         flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
290                             .setBatchOrder(itemOrder++)
291                             .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
292                             .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
293                             .build());
294                     }
295                     final Batch batch = new BatchBuilder()
296                             .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
297                                     .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
298                                     .build())
299                             .setBatchOrder(order)
300                             .build();
301                     order += itemOrder;
302                     batchBag.add(batch);
303                 }
304             }
305         }
306         return order;
307     }
308
309     @VisibleForTesting
310     static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder,
311             final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
312         // process group add+update
313         int order = batchOrder;
314         if (groupsToRemoveOrUpdate != null) {
315             for (ItemSyncBox<Group> groupItemSyncBox : groupsToRemoveOrUpdate) {
316                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
317                     final List<FlatBatchRemoveGroup> flatBatchRemoveGroupBag =
318                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
319                     int itemOrder = 0;
320                     for (Group group : groupItemSyncBox.getItemsToPush()) {
321                         flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group)
322                                 .setBatchOrder(itemOrder++).build());
323                     }
324                     final Batch batch = new BatchBuilder()
325                             .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
326                                     .setFlatBatchRemoveGroup(flatBatchRemoveGroupBag)
327                                     .build())
328                             .setBatchOrder(order)
329                             .build();
330                     order += itemOrder;
331                     batchBag.add(batch);
332                 }
333             }
334         }
335         return order;
336     }
337
338     @VisibleForTesting
339     static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder,
340             final ItemSyncBox<Meter> meterItemSyncBox) {
341         // process meter add+update
342         int order = batchOrder;
343         if (meterItemSyncBox != null) {
344             if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
345                 final List<FlatBatchAddMeter> flatBatchAddMeterBag =
346                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
347                 int itemOrder = 0;
348                 for (Meter meter : meterItemSyncBox.getItemsToPush()) {
349                     flatBatchAddMeterBag.add(new FlatBatchAddMeterBuilder(meter).setBatchOrder(itemOrder++).build());
350                 }
351                 final Batch batch = new BatchBuilder()
352                         .setBatchChoice(new FlatBatchAddMeterCaseBuilder()
353                                 .setFlatBatchAddMeter(flatBatchAddMeterBag)
354                                 .build())
355                         .setBatchOrder(order)
356                         .build();
357                 order += itemOrder;
358                 batchBag.add(batch);
359             }
360
361             if (!meterItemSyncBox.getItemsToUpdate().isEmpty()) {
362                 final List<FlatBatchUpdateMeter> flatBatchUpdateMeterBag =
363                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
364                 int itemOrder = 0;
365                 for (ItemSyncBox.ItemUpdateTuple<Meter> meterUpdate : meterItemSyncBox.getItemsToUpdate()) {
366                     flatBatchUpdateMeterBag.add(new FlatBatchUpdateMeterBuilder()
367                             .setBatchOrder(itemOrder++)
368                             .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(meterUpdate.getOriginal()).build())
369                             .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(meterUpdate.getUpdated()).build())
370                             .build());
371                 }
372                 final Batch batch = new BatchBuilder()
373                         .setBatchChoice(new FlatBatchUpdateMeterCaseBuilder()
374                                 .setFlatBatchUpdateMeter(flatBatchUpdateMeterBag)
375                                 .build())
376                         .setBatchOrder(order)
377                         .build();
378                 order += itemOrder;
379                 batchBag.add(batch);
380             }
381         }
382         return order;
383     }
384
385     @VisibleForTesting
386     static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder,
387             final ItemSyncBox<Meter> meterItemSyncBox) {
388         // process meter remove
389         int order = batchOrder;
390         if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
391             final List<FlatBatchRemoveMeter> flatBatchRemoveMeterBag =
392                     new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
393             int itemOrder = 0;
394             for (Meter meter : meterItemSyncBox.getItemsToPush()) {
395                 flatBatchRemoveMeterBag.add(new FlatBatchRemoveMeterBuilder(meter).setBatchOrder(itemOrder++).build());
396             }
397             final Batch batch = new BatchBuilder()
398                     .setBatchChoice(new FlatBatchRemoveMeterCaseBuilder()
399                             .setFlatBatchRemoveMeter(flatBatchRemoveMeterBag)
400                             .build())
401                     .setBatchOrder(order)
402                     .build();
403             order += itemOrder;
404             batchBag.add(batch);
405         }
406         return order;
407     }
408
409     @VisibleForTesting
410     static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder,
411             final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
412         // process flow add+update
413         int order = batchOrder;
414         if (flowItemSyncTableMap != null) {
415             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
416                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
417
418                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
419                     final List<FlatBatchAddFlow> flatBatchAddFlowBag =
420                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
421                     int itemOrder = 0;
422                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
423                         flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
424                                 .setBatchOrder(itemOrder++)
425                                 .setFlowId(flow.getId())
426                                 .build());
427                     }
428                     final Batch batch = new BatchBuilder()
429                             .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
430                                     .setFlatBatchAddFlow(flatBatchAddFlowBag)
431                                     .build())
432                             .setBatchOrder(order)
433                             .build();
434                     order += itemOrder;
435                     batchBag.add(batch);
436                 }
437
438                 if (!flowItemSyncBox.getItemsToUpdate().isEmpty()) {
439                     final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
440                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
441                     int itemOrder = 0;
442                     for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
443                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
444                             .setBatchOrder(itemOrder++)
445                             .setFlowId(flowUpdate.getUpdated().getId())
446                             .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
447                             .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
448                             .build());
449                     }
450                     final Batch batch = new BatchBuilder()
451                             .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()
452                                     .setFlatBatchUpdateFlow(flatBatchUpdateFlowBag)
453                                     .build())
454                             .setBatchOrder(order)
455                             .build();
456                     order += itemOrder;
457                     batchBag.add(batch);
458                 }
459             }
460         }
461         return order;
462     }
463
464     public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
465         this.flatBatchService = flatBatchService;
466         return this;
467     }
468
469     public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
470         this.tableForwarder = tableForwarder;
471         return this;
472     }
473 }