Bug 5575 added strategy implementation for flat-batch + tests 27/38327/7
authorAndrej Leitner <anleitne@cisco.com>
Tue, 3 May 2016 12:31:13 +0000 (14:31 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Tue, 24 May 2016 06:32:07 +0000 (08:32 +0200)
Change-Id: I667485c13d4fffd166648c339f8554a2680ccd38
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImpl.java [new file with mode: 0644]
applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/util/ReconcileUtil.java
applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImplTest.java [new file with mode: 0644]

diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImpl.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImpl.java
new file mode 100644 (file)
index 0000000..7db755e
--- /dev/null
@@ -0,0 +1,378 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
+import org.opendaylight.openflowplugin.applications.frsync.impl.TableForwarder;
+import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncCrudCounters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.flow._case.FlatBatchAddFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroup;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.group._case.FlatBatchAddGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.add.meter._case.FlatBatchAddMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.flow._case.FlatBatchRemoveFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroup;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.group._case.FlatBatchRemoveGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.remove.meter._case.FlatBatchRemoveMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroup;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Execute CRUD API for flow + group + meter involving flat-batch strategy.
+ */
+public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyFlatBatchImpl.class);
+
+    private SalFlatBatchService flatBatchService;
+    private TableForwarder tableForwarder;
+
+    @Override
+    public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
+                                                                 final SynchronizationDiffInput diffInput,
+                                                                 final SyncCrudCounters counters) {
+        final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
+        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+
+        /* Tables - have to be pushed before groups */
+        // TODO enable table-update when ready
+        //resultVehicle = updateTableFeatures(nodeIdent, configTree);
+
+        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+            @Override
+            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
+                if (!input.isSuccessful()) {
+                    //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
+                    //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
+                    //        Futures.asList Arrays.asList(input, output),
+                    //        ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
+                }
+
+                final List<Batch> batchBag = new ArrayList<>();
+                int batchOrder = 0;
+
+                batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
+                batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
+                batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
+
+                batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
+                batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
+                batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
+
+                LOG.trace("Index of last batch step: {}", batchOrder);
+
+                final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
+                        .setNode(new NodeRef(diffInput.getNodeIdent()))
+                        .setExitOnFirstError(false) // TODO: propagate from input
+                        .setBatch(batchBag)
+                        .build();
+
+                final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
+
+                return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
+                        ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-bulk"));
+            }
+        });
+
+        Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "removeRedundantGroups"));
+        return resultVehicle;
+    }
+
+    @VisibleForTesting
+    static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
+        // process flow remove
+        if (flowItemSyncTableMap != null) {
+            for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
+                final TableKey tableKey = syncBoxEntry.getKey();
+                final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
+
+                if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
+                    final List<FlatBatchRemoveFlow> flatBatchRemoveFlowBag =
+                            new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (Flow Flow : flowItemSyncBox.getItemsToPush()) {
+                        flatBatchRemoveFlowBag.add(new FlatBatchRemoveFlowBuilder(Flow).setBatchOrder(itemOrder++).build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchRemoveFlowCaseBuilder()
+                                    .setFlatBatchRemoveFlow(flatBatchRemoveFlowBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+            }
+        }
+        return batchOrder;
+    }
+
+    @VisibleForTesting
+    static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
+        // process group add+update
+        if (groupsToAddOrUpdate != null) {
+            for (ItemSyncBox<Group> groupItemSyncBox : groupsToAddOrUpdate) {
+                if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
+                    final List<FlatBatchAddGroup> flatBatchAddGroupBag =
+                            new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (Group group : groupItemSyncBox.getItemsToPush()) {
+                        flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group).setBatchOrder(itemOrder++).build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
+                                    .setFlatBatchAddGroup(flatBatchAddGroupBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+
+                if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
+                    final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
+                            new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
+                        flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
+                                .setBatchOrder(itemOrder++)
+                                .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
+                                .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
+                                .build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
+                                    .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+            }
+        }
+        return batchOrder;
+    }
+
+    @VisibleForTesting
+    static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
+        // process group add+update
+        if (groupsToRemoveOrUpdate != null) {
+            for (ItemSyncBox<Group> groupItemSyncBox : groupsToRemoveOrUpdate) {
+                if (!groupItemSyncBox.getItemsToPush().isEmpty()) {
+                    final List<FlatBatchRemoveGroup> flatBatchRemoveGroupBag =
+                            new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (Group group : groupItemSyncBox.getItemsToPush()) {
+                        flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group).setBatchOrder(itemOrder++).build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
+                                    .setFlatBatchRemoveGroup(flatBatchRemoveGroupBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+
+                if (!groupItemSyncBox.getItemsToUpdate().isEmpty()) {
+                    final List<FlatBatchUpdateGroup> flatBatchUpdateGroupBag =
+                            new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
+                        flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
+                                .setBatchOrder(itemOrder++)
+                                .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
+                                .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
+                                .build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
+                                    .setFlatBatchUpdateGroup(flatBatchUpdateGroupBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+            }
+        }
+        return batchOrder;
+    }
+
+    @VisibleForTesting
+    static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
+        // process meter add+update
+        if (meterItemSyncBox != null) {
+            if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
+                final List<FlatBatchAddMeter> flatBatchAddMeterBag =
+                        new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
+                int itemOrder = 0;
+                for (Meter meter : meterItemSyncBox.getItemsToPush()) {
+                    flatBatchAddMeterBag.add(new FlatBatchAddMeterBuilder(meter).setBatchOrder(itemOrder++).build());
+                }
+                final Batch batch = new BatchBuilder()
+                        .setBatchChoice(new FlatBatchAddMeterCaseBuilder()
+                                .setFlatBatchAddMeter(flatBatchAddMeterBag)
+                                .build())
+                        .setBatchOrder(batchOrder++)
+                        .build();
+                batchBag.add(batch);
+            }
+
+            if (!meterItemSyncBox.getItemsToUpdate().isEmpty()) {
+                final List<FlatBatchUpdateMeter> flatBatchUpdateMeterBag =
+                        new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
+                int itemOrder = 0;
+                for (ItemSyncBox.ItemUpdateTuple<Meter> meterUpdate : meterItemSyncBox.getItemsToUpdate()) {
+                    flatBatchUpdateMeterBag.add(new FlatBatchUpdateMeterBuilder()
+                            .setBatchOrder(itemOrder++)
+                            .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(meterUpdate.getOriginal()).build())
+                            .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(meterUpdate.getUpdated()).build())
+                            .build());
+                }
+                final Batch batch = new BatchBuilder()
+                        .setBatchChoice(new FlatBatchUpdateMeterCaseBuilder()
+                                .setFlatBatchUpdateMeter(flatBatchUpdateMeterBag)
+                                .build())
+                        .setBatchOrder(batchOrder++)
+                        .build();
+                batchBag.add(batch);
+            }
+        }
+        return batchOrder;
+    }
+
+    @VisibleForTesting
+    static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
+        // process meter remove
+        if (meterItemSyncBox != null) {
+            if (!meterItemSyncBox.getItemsToPush().isEmpty()) {
+                final List<FlatBatchRemoveMeter> flatBatchRemoveMeterBag =
+                        new ArrayList<>(meterItemSyncBox.getItemsToUpdate().size());
+                int itemOrder = 0;
+                for (Meter meter : meterItemSyncBox.getItemsToPush()) {
+                    flatBatchRemoveMeterBag.add(new FlatBatchRemoveMeterBuilder(meter).setBatchOrder(itemOrder++).build());
+                }
+                final Batch batch = new BatchBuilder()
+                        .setBatchChoice(new FlatBatchRemoveMeterCaseBuilder()
+                                .setFlatBatchRemoveMeter(flatBatchRemoveMeterBag)
+                                .build())
+                        .setBatchOrder(batchOrder++)
+                        .build();
+                batchBag.add(batch);
+            }
+        }
+        return batchOrder;
+    }
+
+    @VisibleForTesting
+    static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
+        // process flow add+update
+        if (flowItemSyncTableMap != null) {
+            for (Map.Entry<TableKey, ItemSyncBox<Flow>> syncBoxEntry : flowItemSyncTableMap.entrySet()) {
+                final TableKey tableKey = syncBoxEntry.getKey();
+                final ItemSyncBox<Flow> flowItemSyncBox = syncBoxEntry.getValue();
+
+                if (!flowItemSyncBox.getItemsToPush().isEmpty()) {
+                    final List<FlatBatchAddFlow> flatBatchAddFlowBag =
+                            new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (Flow Flow : flowItemSyncBox.getItemsToPush()) {
+                        flatBatchAddFlowBag.add(new FlatBatchAddFlowBuilder(Flow).setBatchOrder(itemOrder++).build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchAddFlowCaseBuilder()
+                                    .setFlatBatchAddFlow(flatBatchAddFlowBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+
+                if (!flowItemSyncBox.getItemsToUpdate().isEmpty()) {
+                    final List<FlatBatchUpdateFlow> flatBatchUpdateFlowBag =
+                            new ArrayList<>(flowItemSyncBox.getItemsToUpdate().size());
+                    int itemOrder = 0;
+                    for (ItemSyncBox.ItemUpdateTuple<Flow> FlowUpdate : flowItemSyncBox.getItemsToUpdate()) {
+                        flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
+                                .setBatchOrder(itemOrder++)
+                                .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(FlowUpdate.getOriginal()).build())
+                                .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(FlowUpdate.getUpdated()).build())
+                                .build());
+                    }
+                    final Batch batch = new BatchBuilder()
+                            .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()
+                                    .setFlatBatchUpdateFlow(flatBatchUpdateFlowBag)
+                                    .build())
+                            .setBatchOrder(batchOrder++)
+                            .build();
+                    batchBag.add(batch);
+                }
+            }
+        }
+        return batchOrder;
+    }
+
+    public void setFlatBatchService(final SalFlatBatchService flatBatchService) {
+        this.flatBatchService = flatBatchService;
+    }
+
+    public void setTableForwarder(final TableForwarder tableForwarder) {
+        this.tableForwarder = tableForwarder;
+    }
+}
index 913606ccabd7fa81d5e5a1b2457b39ee4a71ed81..e4a12499645583662d7556f48269089a43b98a87 100644 (file)
@@ -16,7 +16,16 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
@@ -41,16 +50,6 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
 /**
  * Util methods for group reconcil task (future chaining, transforms).
  */
@@ -92,6 +91,36 @@ public class ReconcileUtil {
         };
     }
 
+    /**
+     * @param actionDescription description for case when the triggering future contains failure
+     * @param <D>               type of rpc output (gathered in list)
+     * @return single rpc result of type Void honoring all partial rpc results
+     */
+    public static <D> Function<RpcResult<D>, RpcResult<Void>> createRpcResultToVoidFunction(final String actionDescription) {
+        return new Function<RpcResult<D>, RpcResult<Void>>() {
+            @Nullable
+            @Override
+            public RpcResult<Void> apply(@Nullable final RpcResult<D> input) {
+                final RpcResultBuilder<Void> resultSink;
+                if (input != null) {
+                    List<RpcError> errors = new ArrayList<>();
+                    if (!input.isSuccessful()) {
+                        errors.addAll(input.getErrors());
+                        resultSink = RpcResultBuilder.<Void>failed().withRpcErrors(errors);
+                    } else {
+                        resultSink = RpcResultBuilder.success();
+                    }
+                } else {
+                    resultSink = RpcResultBuilder.<Void>failed()
+                            .withError(RpcError.ErrorType.APPLICATION, "action of " + actionDescription + " failed");
+
+                }
+
+                return resultSink.build();
+            }
+        };
+    }
+
     /**
      * @param nodeIdent                     flow capable node path - target device for routed rpc
      * @param flowCapableTransactionService barrier rpc service
diff --git a/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImplTest.java b/applications/forwardingrules-sync/src/test/java/org/opendaylight/openflowplugin/applications/frsync/impl/strategy/SyncPlanPushStrategyFlatBatchImplTest.java
new file mode 100644 (file)
index 0000000..337a5ee
--- /dev/null
@@ -0,0 +1,265 @@
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl.strategy;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddGroupCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddMeterCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveFlowCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveGroupCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchRemoveMeterCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SyncPlanPushStrategyFlatBatchImpl}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncPlanPushStrategyFlatBatchImplTest {
+
+    private static final NodeId NODE_ID = new NodeId("ut-node-id");
+    private static final InstanceIdentifier<FlowCapableNode> NODE_IDENT = InstanceIdentifier.create(Nodes.class)
+            .child(Node.class, new NodeKey(NODE_ID))
+            .augmentation(FlowCapableNode.class);
+    @Mock
+    private SalFlatBatchService flatBatchService;
+    @Mock
+    private SalFlatBatchService tableUpdateService;
+
+    private List<ItemSyncBox<Group>> groupsToAddOrUpdate;
+    private List<ItemSyncBox<Group>> groupsToRemove;
+    private ItemSyncBox<Meter> metersToAddOrUpdate;
+    private ItemSyncBox<Meter> metersToRemove;
+    private Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
+    private Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
+    private List<Batch> batchBag;
+
+    private SyncPlanPushStrategyFlatBatchImpl syncPlanPushStrategy;
+
+    public SyncPlanPushStrategyFlatBatchImplTest() {
+        groupsToAddOrUpdate = Lists.newArrayList(createGroupSyncBox(1, 2, 3), createGroupSyncBoxWithUpdates(4, 5, 6));
+        groupsToRemove = Lists.newArrayList(createGroupSyncBox(1, 2, 3), createGroupSyncBox(4, 5, 6));
+
+        metersToAddOrUpdate = createMeterSyncBoxWithUpdates(1, 2, 3);
+        metersToRemove = createMeterSyncBox(1, 2, 3);
+
+        flowsToAddOrUpdate = new HashMap<>();
+        flowsToAddOrUpdate.put(new TableKey((short) 0), createFlowSyncBox("1", "2", "3"));
+        flowsToAddOrUpdate.put(new TableKey((short) 1), createFlowSyncBoxWithUpdates("4", "5", "6"));
+        flowsToRemove = new HashMap<>();
+        flowsToRemove.put(new TableKey((short) 0), createFlowSyncBox("1", "2", "3"));
+        flowsToRemove.put(new TableKey((short) 1), createFlowSyncBox("4", "5", "6"));
+    }
+
+    private ItemSyncBox<Group> createGroupSyncBox(final long... groupIDs) {
+        final ItemSyncBox<Group> groupBox = new ItemSyncBox<>();
+
+        for (long gid : groupIDs) {
+            groupBox.getItemsToPush().add(createPlainGroup(gid));
+        }
+        return groupBox;
+    }
+
+    private ItemSyncBox<Group> createGroupSyncBoxWithUpdates(final long... groupIDs) {
+        final ItemSyncBox<Group> groupBox = new ItemSyncBox<>();
+
+        for (long gid : groupIDs) {
+            groupBox.getItemsToPush().add(createPlainGroup(gid));
+            groupBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(createPlainGroup(gid),
+                    createPlainGroup(gid + 100)));
+        }
+        return groupBox;
+    }
+
+    private Group createPlainGroup(final long gid) {
+        return new GroupBuilder().setGroupId(new GroupId(gid)).build();
+    }
+
+    private ItemSyncBox<Meter> createMeterSyncBox(final long... meterIDs) {
+        final ItemSyncBox<Meter> groupBox = new ItemSyncBox<>();
+
+        for (long gid : meterIDs) {
+            groupBox.getItemsToPush().add(createPlainMeter(gid));
+        }
+        return groupBox;
+    }
+
+    private ItemSyncBox<Meter> createMeterSyncBoxWithUpdates(final long... meterIDs) {
+        final ItemSyncBox<Meter> groupBox = new ItemSyncBox<>();
+
+        for (long mid : meterIDs) {
+            groupBox.getItemsToPush().add(createPlainMeter(mid));
+            groupBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(createPlainMeter(mid),
+                    createPlainMeter(mid + 100)));
+        }
+        return groupBox;
+    }
+
+    private Meter createPlainMeter(final long mid) {
+        return new MeterBuilder().setMeterId(new MeterId(mid)).build();
+    }
+
+    private ItemSyncBox<Flow> createFlowSyncBox(final String... flowIDs) {
+        final ItemSyncBox<Flow> flowBox = new ItemSyncBox<>();
+
+        for (String fid : flowIDs) {
+            flowBox.getItemsToPush().add(createPlainFlow(fid));
+        }
+        return flowBox;
+    }
+
+    private ItemSyncBox<Flow> createFlowSyncBoxWithUpdates(final String... flowIDs) {
+        final ItemSyncBox<Flow> groupBox = new ItemSyncBox<>();
+
+        for (String fid : flowIDs) {
+            groupBox.getItemsToPush().add(createPlainFlow(fid));
+            groupBox.getItemsToUpdate().add(new ItemSyncBox.ItemUpdateTuple<>(createPlainFlow(fid),
+                    createPlainFlow(fid + "upd")));
+        }
+        return groupBox;
+    }
+
+    private Flow createPlainFlow(final String fid) {
+        return new FlowBuilder().setId(new FlowId(fid)).build();
+    }
+
+
+    @Before
+    public void setUp() throws Exception {
+        syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl();
+        syncPlanPushStrategy.setFlatBatchService(flatBatchService);
+        syncPlanPushStrategy.setFlatBatchService(tableUpdateService);
+
+        batchBag = new ArrayList<>();
+    }
+
+    @Test
+    public void testExecuteSyncStrategy() throws Exception {
+        final SynchronizationDiffInput diffInput = new SynchronizationDiffInput(NODE_IDENT,
+                groupsToAddOrUpdate, null, null, null, null, null);
+    }
+
+    @Test
+    public void testAssembleRemoveFlows() throws Exception {
+        final int lastOrder = SyncPlanPushStrategyFlatBatchImpl.assembleRemoveFlows(batchBag, 0, flowsToRemove);
+
+        Assert.assertEquals(2, lastOrder);
+        Assert.assertEquals(2, batchBag.size());
+        Assert.assertEquals(FlatBatchRemoveFlowCase.class, batchBag.get(0).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchRemoveFlowCase) batchBag.get(0).getBatchChoice())
+                .getFlatBatchRemoveFlow().size());
+        Assert.assertEquals(FlatBatchRemoveFlowCase.class, batchBag.get(1).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchRemoveFlowCase) batchBag.get(1).getBatchChoice())
+                .getFlatBatchRemoveFlow().size());
+    }
+
+    @Test
+    public void testAssembleAddOrUpdateGroups() throws Exception {
+        final int lastOrder = SyncPlanPushStrategyFlatBatchImpl.assembleAddOrUpdateGroups(batchBag, 0, groupsToAddOrUpdate);
+
+        Assert.assertEquals(3, lastOrder);
+        Assert.assertEquals(3, batchBag.size());
+        Assert.assertEquals(FlatBatchAddGroupCase.class, batchBag.get(0).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchAddGroupCase) batchBag.get(0).getBatchChoice())
+                .getFlatBatchAddGroup().size());
+        Assert.assertEquals(FlatBatchAddGroupCase.class, batchBag.get(1).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchAddGroupCase) batchBag.get(1).getBatchChoice())
+                .getFlatBatchAddGroup().size());
+        Assert.assertEquals(FlatBatchUpdateGroupCase.class, batchBag.get(2).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchUpdateGroupCase) batchBag.get(2).getBatchChoice())
+                .getFlatBatchUpdateGroup().size());
+    }
+
+    @Test
+    public void testAssembleRemoveGroups() throws Exception {
+        final int lastOrder = SyncPlanPushStrategyFlatBatchImpl.assembleRemoveGroups(batchBag, 0, groupsToRemove);
+
+        Assert.assertEquals(2, lastOrder);
+        Assert.assertEquals(2, batchBag.size());
+        Assert.assertEquals(FlatBatchRemoveGroupCase.class, batchBag.get(0).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchRemoveGroupCase) batchBag.get(0).getBatchChoice())
+                .getFlatBatchRemoveGroup().size());
+        Assert.assertEquals(FlatBatchRemoveGroupCase.class, batchBag.get(1).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchRemoveGroupCase) batchBag.get(1).getBatchChoice())
+                .getFlatBatchRemoveGroup().size());
+    }
+
+    @Test
+    public void testAssembleAddOrUpdateMeters() throws Exception {
+        final int lastOrder = SyncPlanPushStrategyFlatBatchImpl.assembleAddOrUpdateMeters(batchBag, 0, metersToAddOrUpdate);
+
+        Assert.assertEquals(2, lastOrder);
+        Assert.assertEquals(2, batchBag.size());
+        Assert.assertEquals(FlatBatchAddMeterCase.class, batchBag.get(0).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchAddMeterCase) batchBag.get(0).getBatchChoice())
+                .getFlatBatchAddMeter().size());
+        Assert.assertEquals(FlatBatchUpdateMeterCase.class, batchBag.get(1).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchUpdateMeterCase) batchBag.get(1).getBatchChoice())
+                .getFlatBatchUpdateMeter().size());
+    }
+
+    @Test
+    public void testAssembleRemoveMeters() throws Exception {
+        final int lastOrder = SyncPlanPushStrategyFlatBatchImpl.assembleRemoveMeters(batchBag, 0, metersToRemove);
+
+        Assert.assertEquals(1, lastOrder);
+        Assert.assertEquals(1, batchBag.size());
+        Assert.assertEquals(FlatBatchRemoveMeterCase.class, batchBag.get(0).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchRemoveMeterCase) batchBag.get(0).getBatchChoice())
+                .getFlatBatchRemoveMeter().size());
+    }
+
+    @Test
+    public void testAssembleAddOrUpdateFlows() throws Exception {
+        final int lastOrder = SyncPlanPushStrategyFlatBatchImpl.assembleAddOrUpdateFlows(batchBag, 0, flowsToAddOrUpdate);
+
+        Assert.assertEquals(3, lastOrder);
+        Assert.assertEquals(3, batchBag.size());
+        Assert.assertEquals(FlatBatchAddFlowCase.class, batchBag.get(0).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchAddFlowCase) batchBag.get(0).getBatchChoice())
+                .getFlatBatchAddFlow().size());
+        Assert.assertEquals(FlatBatchUpdateFlowCase.class, batchBag.get(1).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchUpdateFlowCase) batchBag.get(1).getBatchChoice())
+                .getFlatBatchUpdateFlow().size());
+        Assert.assertEquals(FlatBatchAddFlowCase.class, batchBag.get(2).getBatchChoice().getImplementedInterface());
+        Assert.assertEquals(3, ((FlatBatchAddFlowCase) batchBag.get(2).getBatchChoice())
+                .getFlatBatchAddFlow().size());
+    }
+}
\ No newline at end of file