Merge "Adding learn action"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyFlatBatchImpl.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.collect.Iterators;
13 import com.google.common.collect.PeekingIterator;
14 import com.google.common.collect.Range;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.JdkFutureAdapters;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import java.util.ArrayList;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Future;
25 import javax.annotation.Nullable;
26 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
27 import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
28 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
29 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
30 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
31 import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCaseBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCase;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCaseBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCase;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCaseBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCase;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCaseBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCase;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCaseBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCase;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCaseBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCaseBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlow;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlowBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroup;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroupBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeter;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeterBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlow;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlowBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroup;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroupBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeter;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeterBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlow;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroup;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
89 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
90 import org.opendaylight.yangtools.yang.common.RpcResult;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93
94 /**
95  * Execute CRUD API for flow + group + meter involving flat-batch strategy.
96  */
97 public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
98
99     private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyFlatBatchImpl.class);
100
101     private SalFlatBatchService flatBatchService;
102     private TableForwarder tableForwarder;
103
104     @Override
105     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
106                                                                  final SynchronizationDiffInput diffInput,
107                                                                  final SyncCrudCounters counters) {
108         final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
109         final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
110
111         // prepare default (full) counts
112         counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
113         counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
114         counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
115
116         counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getFlowsToAddOrUpdate().values()));
117         counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getFlowsToAddOrUpdate().values()));
118         counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
119
120         counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
121         counters.getMeterCrudCounts().setUpdated(diffInput.getMetersToAddOrUpdate().getItemsToUpdate().size());
122         counters.getMeterCrudCounts().setRemoved(diffInput.getMetersToRemove().getItemsToPush().size());
123
124         /* Tables - have to be pushed before groups */
125         // TODO enable table-update when ready
126         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
127
128         resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
129             @Override
130             public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
131                 final List<Batch> batchBag = new ArrayList<>();
132                 int batchOrder = 0;
133
134                 batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
135                 batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
136                 batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
137
138                 batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
139                 batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
140                 batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
141
142                 LOG.trace("Index of last batch step: {}", batchOrder);
143
144                 final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
145                         .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
146                         // TODO: propagate from input
147                         .setExitOnFirstError(false)
148                         .setBatch(batchBag)
149                         .build();
150
151                 final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
152
153                 final int failureIndexLimit = batchOrder;
154
155                 if (LOG.isDebugEnabled()) {
156                     Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
157                             createCounterCallback(batchBag, failureIndexLimit, counters));
158                 }
159
160                 return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
161                         ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
162             }
163         });
164
165         Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "flat-batch"));
166         return resultVehicle;
167     }
168
169     private FutureCallback<RpcResult<ProcessFlatBatchOutput>> createCounterCallback(final List<Batch> inputBatchBag,
170                                                                                     final int failureIndexLimit,
171                                                                                     final SyncCrudCounters counters) {
172         return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
173             @Override
174             public void onSuccess(@Nullable final RpcResult<ProcessFlatBatchOutput> result) {
175                 if (!result.isSuccessful() && result.getResult() != null && !result.getResult().getBatchFailure().isEmpty()) {
176                     Map<Range<Integer>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
177                     decrementBatchFailuresCounters(result.getResult().getBatchFailure(), batchMap, counters);
178                 }
179             }
180
181             @Override
182             public void onFailure(final Throwable t) {
183                 counters.resetAll();
184             }
185         };
186     }
187
188     private static void decrementBatchFailuresCounters(final List<BatchFailure> batchFailures,
189                                                 final Map<Range<Integer>, Batch> batchMap,
190                                                 final SyncCrudCounters counters) {
191         for (BatchFailure batchFailure : batchFailures) {
192             for (Map.Entry<Range<Integer>, Batch> rangeBatchEntry : batchMap.entrySet()) {
193                 if (rangeBatchEntry.getKey().contains(batchFailure.getBatchOrder())) {
194                     // get type and decrease
195                     final BatchChoice batchChoice = rangeBatchEntry.getValue().getBatchChoice();
196                     decrementCounters(batchChoice, counters);
197                     break;
198                 }
199             }
200         }
201     }
202
203     static void decrementCounters(final BatchChoice batchChoice, final SyncCrudCounters counters) {
204         if (batchChoice instanceof FlatBatchAddFlowCase) {
205             counters.getFlowCrudCounts().decAdded();
206         } else if (batchChoice instanceof FlatBatchUpdateFlowCase) {
207             counters.getFlowCrudCounts().decUpdated();
208         } else if (batchChoice instanceof FlatBatchRemoveFlowCase) {
209             counters.getFlowCrudCounts().decRemoved();
210         } else if (batchChoice instanceof FlatBatchAddGroupCase) {
211             counters.getGroupCrudCounts().decAdded();
212         } else if (batchChoice instanceof FlatBatchUpdateGroupCase) {
213             counters.getGroupCrudCounts().decUpdated();
214         } else if (batchChoice instanceof FlatBatchRemoveGroupCase) {
215             counters.getGroupCrudCounts().decRemoved();
216         } else if (batchChoice instanceof FlatBatchAddMeterCase) {
217             counters.getMeterCrudCounts().decAdded();
218         } else if (batchChoice instanceof FlatBatchUpdateMeterCase) {
219             counters.getMeterCrudCounts().decUpdated();
220         } else if (batchChoice instanceof FlatBatchRemoveMeterCase) {
221             counters.getMeterCrudCounts().decRemoved();
222         }
223     }
224
225     static Map<Range<Integer>, Batch> mapBatchesToRanges(final List<Batch> inputBatchBag, final int failureIndexLimit) {
226         final Map<Range<Integer>, Batch> batchMap = new LinkedHashMap<>();
227         final PeekingIterator<Batch> batchPeekingIterator = Iterators.peekingIterator(inputBatchBag.iterator());
228         while (batchPeekingIterator.hasNext()) {
229             final Batch batch = batchPeekingIterator.next();
230             final int nextBatchOrder = batchPeekingIterator.hasNext()
231                     ? batchPeekingIterator.peek().getBatchOrder()
232                     : failureIndexLimit;
233             batchMap.put(Range.closed(batch.getBatchOrder(), nextBatchOrder - 1), batch);
234         }
235         return batchMap;
236     }
237
238     @VisibleForTesting
239     static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
240         // process flow remove
241         int order = batchOrder;
242         if (flowItemSyncTableMap != null) {
243             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
244                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
245
246                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
247                     final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
248                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
249                     int itemOrder = 0;
250                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
251                         flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(flow)
252                                 .setBatchOrder(itemOrder++)
253                                 .setFlowId(flow.getId())
254                                 .build());
255                     }
256                     final Batch batch = new BatchBuilder()
257                             .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
258                                     .setFlatBatchRemoveFlow(flatBatchRemoveFlowBag)
259                                     .build())
260                             .setBatchOrder(order)
261                             .build();
262                     order += itemOrder;
263                     batchBag.add(batch);
264                 }
265             }
266         }
267         return order;
268     }
269
270     @VisibleForTesting
271     static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
272         // process group add+update
273         int order = batchOrder;
274         if (groupsToAddOrUpdate != null) {
275             for (ItemSyncBox<Group> groupItemSyncBox : groupsToAddOrUpdate) {
276                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
277                     final List<FlatBatchAddGroup> flatBatchAddGroupBag =
278                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
279                     int itemOrder = 0;
280                     for (Group group : groupItemSyncBox.getItemsToPush()) {
281                         flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group).setBatchOrder(itemOrder++).build());
282                     }
283                     final Batch batch = new BatchBuilder()
284                             .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
285                                     .setFlatBatchAddGroup(flatBatchAddGroupBag)
286                                     .build())
287                             .setBatchOrder(order)
288                             .build();
289                     order += itemOrder;
290                     batchBag.add(batch);
291                 }
292
293                 if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
294                     final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
295                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
296                     int itemOrder = 0;
297                     for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
298                         flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
299                                 .setBatchOrder(itemOrder++)
300                                 .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
301                                 .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
302                                 .build());
303                     }
304                     final Batch batch = new BatchBuilder()
305                             .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
306                                     .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
307                                     .build())
308                             .setBatchOrder(order)
309                             .build();
310                     order += itemOrder;
311                     batchBag.add(batch);
312                 }
313             }
314         }
315         return order;
316     }
317
318     @VisibleForTesting
319     static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
320         // process group add+update
321         int order = batchOrder;
322         if (groupsToRemoveOrUpdate != null) {
323             for (ItemSyncBox<Group> groupItemSyncBox : groupsToRemoveOrUpdate) {
324                 if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
325                     final List<FlatBatchRemoveGroup> flatBatchRemoveGroupBag =
326                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
327                     int itemOrder = 0;
328                     for (Group group : groupItemSyncBox.getItemsToPush()) {
329                         flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group).setBatchOrder(itemOrder++).build());
330                     }
331                     final Batch batch = new BatchBuilder()
332                             .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
333                                     .setFlatBatchRemoveGroup(flatBatchRemoveGroupBag)
334                                     .build())
335                             .setBatchOrder(order)
336                             .build();
337                     order += itemOrder;
338                     batchBag.add(batch);
339                 }
340             }
341         }
342         return order;
343     }
344
345     @VisibleForTesting
346     static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
347         // process meter add+update
348         int order = batchOrder;
349         if (meterItemSyncBox != null) {
350             if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
351                 final List<FlatBatchAddMeter> flatBatchAddMeterBag =
352                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
353                 int itemOrder = 0;
354                 for (Meter meter : meterItemSyncBox.getItemsToPush()) {
355                     flatBatchAddMeterBag.add(new FlatBatchAddMeterBuilder(meter).setBatchOrder(itemOrder++).build());
356                 }
357                 final Batch batch = new BatchBuilder()
358                         .setBatchChoice(new FlatBatchAddMeterCaseBuilder()
359                                 .setFlatBatchAddMeter(flatBatchAddMeterBag)
360                                 .build())
361                         .setBatchOrder(order)
362                         .build();
363                 order += itemOrder;
364                 batchBag.add(batch);
365             }
366
367             if (!meterItemSyncBox.getItemsToUpdate().isEmpty()) {
368                 final List<FlatBatchUpdateMeter> flatBatchUpdateMeterBag =
369                         new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
370                 int itemOrder = 0;
371                 for (ItemSyncBox.ItemUpdateTuple<Meter> meterUpdate : meterItemSyncBox.getItemsToUpdate()) {
372                     flatBatchUpdateMeterBag.add(new FlatBatchUpdateMeterBuilder()
373                             .setBatchOrder(itemOrder++)
374                             .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(meterUpdate.getOriginal()).build())
375                             .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(meterUpdate.getUpdated()).build())
376                             .build());
377                 }
378                 final Batch batch = new BatchBuilder()
379                         .setBatchChoice(new FlatBatchUpdateMeterCaseBuilder()
380                                 .setFlatBatchUpdateMeter(flatBatchUpdateMeterBag)
381                                 .build())
382                         .setBatchOrder(order)
383                         .build();
384                 order += itemOrder;
385                 batchBag.add(batch);
386             }
387         }
388         return order;
389     }
390
391     @VisibleForTesting
392     static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
393         // process meter remove
394         int order = batchOrder;
395         if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
396             final List<FlatBatchRemoveMeter> flatBatchRemoveMeterBag =
397                     new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
398             int itemOrder = 0;
399             for (Meter meter : meterItemSyncBox.getItemsToPush()) {
400                 flatBatchRemoveMeterBag.add(new FlatBatchRemoveMeterBuilder(meter).setBatchOrder(itemOrder++).build());
401             }
402             final Batch batch = new BatchBuilder()
403                     .setBatchChoice(new FlatBatchRemoveMeterCaseBuilder()
404                             .setFlatBatchRemoveMeter(flatBatchRemoveMeterBag)
405                             .build())
406                     .setBatchOrder(order)
407                     .build();
408             order += itemOrder;
409             batchBag.add(batch);
410         }
411         return order;
412     }
413
414     @VisibleForTesting
415     static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
416         // process flow add+update
417         int order = batchOrder;
418         if (flowItemSyncTableMap != null) {
419             for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
420                 final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
421
422                 if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
423                     final List<FlatBatchAddFlow> flatBatchAddFlowBag =
424                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
425                     int itemOrder = 0;
426                     for (Flow flow : flowItemSyncBox.getItemsToPush()) {
427                         flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(flow)
428                                 .setBatchOrder(itemOrder++)
429                                 .setFlowId(flow.getId())
430                                 .build());
431                     }
432                     final Batch batch = new BatchBuilder()
433                             .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
434                                     .setFlatBatchAddFlow(flatBatchAddFlowBag)
435                                     .build())
436                             .setBatchOrder(order)
437                             .build();
438                     order += itemOrder;
439                     batchBag.add(batch);
440                 }
441
442                 if (!flowItemSyncBox.getItemsToUpdate().isEmpty()) {
443                     final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
444                             new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
445                     int itemOrder = 0;
446                     for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
447                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
448                                 .setBatchOrder(itemOrder++)
449                                 .setFlowId(flowUpdate.getUpdated().getId())
450                                 .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
451                                 .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
452                                 .build());
453                     }
454                     final Batch batch = new BatchBuilder()
455                             .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()
456                                     .setFlatBatchUpdateFlow(flatBatchUpdateFlowBag)
457                                     .build())
458                             .setBatchOrder(order)
459                             .build();
460                     order += itemOrder;
461                     batchBag.add(batch);
462                 }
463             }
464         }
465         return order;
466     }
467
468     public SyncPlanPushStrategyFlatBatchImpl setFlatBatchService(final SalFlatBatchService flatBatchService) {
469         this.flatBatchService = flatBatchService;
470         return this;
471     }
472
473     public SyncPlanPushStrategyFlatBatchImpl setTableForwarder(final TableForwarder tableForwarder) {
474         this.tableForwarder = tableForwarder;
475         return this;
476     }
477 }