Bump odlparent to 5.0.0
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / strategy / SyncPlanPushStrategyFlatBatchImpl.java
index 68e0803723c532b7e7a9a6de1f9fe51cc3225165..900d7e8b666610aa7b7c75d67d22f1ba8ceaf025 100644 (file)
@@ -12,19 +12,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.collect.Range;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
-import javax.annotation.Nullable;
 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
-import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
 import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
 import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
@@ -73,7 +71,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev16032
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
@@ -82,11 +79,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.bat
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,16 +100,15 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
                                                                  final SynchronizationDiffInput diffInput,
                                                                  final SyncCrudCounters counters) {
-        final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
-        final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
-
         // prepare default (full) counts
         counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
         counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
         counters.getGroupCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getGroupsToRemove()));
 
-        counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getFlowsToAddOrUpdate().values()));
-        counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getFlowsToAddOrUpdate().values()));
+        counters.getFlowCrudCounts().setAdded(ReconcileUtil.countTotalPushed(
+                diffInput.getFlowsToAddOrUpdate().values()));
+        counters.getFlowCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(
+                diffInput.getFlowsToAddOrUpdate().values()));
         counters.getFlowCrudCounts().setRemoved(ReconcileUtil.countTotalPushed(diffInput.getFlowsToRemove().values()));
 
         counters.getMeterCrudCounts().setAdded(diffInput.getMetersToAddOrUpdate().getItemsToPush().size());
@@ -125,44 +119,39 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
         // TODO enable table-update when ready
         //resultVehicle = updateTableFeatures(nodeIdent, configTree);
 
-        resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
-            @Override
-            public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
-                final List<Batch> batchBag = new ArrayList<>();
-                int batchOrder = 0;
-
-                batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
-                batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
-                batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
-
-                batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
-                batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
-                batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
+        resultVehicle = Futures.transformAsync(resultVehicle, input -> {
+            final List<Batch> batchBag = new ArrayList<>();
+            int batchOrder = 0;
 
-                LOG.trace("Index of last batch step: {}", batchOrder);
+            batchOrder = assembleAddOrUpdateGroups(batchBag, batchOrder, diffInput.getGroupsToAddOrUpdate());
+            batchOrder = assembleAddOrUpdateMeters(batchBag, batchOrder, diffInput.getMetersToAddOrUpdate());
+            batchOrder = assembleAddOrUpdateFlows(batchBag, batchOrder, diffInput.getFlowsToAddOrUpdate());
 
-                final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
-                        .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
-                        // TODO: propagate from input
-                        .setExitOnFirstError(false)
-                        .setBatch(batchBag)
-                        .build();
+            batchOrder = assembleRemoveFlows(batchBag, batchOrder, diffInput.getFlowsToRemove());
+            batchOrder = assembleRemoveMeters(batchBag, batchOrder, diffInput.getMetersToRemove());
+            batchOrder = assembleRemoveGroups(batchBag, batchOrder, diffInput.getGroupsToRemove());
 
-                final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
+            LOG.trace("Index of last batch step: {}", batchOrder);
 
-                final int failureIndexLimit = batchOrder;
+            final ProcessFlatBatchInput flatBatchInput = new ProcessFlatBatchInputBuilder()
+                    .setNode(new NodeRef(PathUtil.digNodePath(diffInput.getNodeIdent())))
+                    // TODO: propagate from input
+                    .setExitOnFirstError(false)
+                    .setBatch(batchBag)
+                    .build();
 
-                if (LOG.isDebugEnabled()) {
-                    Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
-                            createCounterCallback(batchBag, failureIndexLimit, counters));
-                }
+            final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture =
+                    flatBatchService.processFlatBatch(flatBatchInput);
 
-                return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
-                        ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
+            if (LOG.isDebugEnabled()) {
+                Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
+                        createCounterCallback(batchBag, batchOrder, counters), MoreExecutors.directExecutor());
             }
-        });
 
-        Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "flat-batch"));
+            return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
+                    ReconcileUtil.createRpcResultToVoidFunction("flat-batch"),
+                    MoreExecutors.directExecutor());
+        }, MoreExecutors.directExecutor());
         return resultVehicle;
     }
 
@@ -171,15 +160,16 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                                                                                     final SyncCrudCounters counters) {
         return new FutureCallback<RpcResult<ProcessFlatBatchOutput>>() {
             @Override
-            public void onSuccess(@Nullable final RpcResult<ProcessFlatBatchOutput> result) {
-                if (!result.isSuccessful() && result.getResult() != null && !result.getResult().getBatchFailure().isEmpty()) {
+            public void onSuccess(final RpcResult<ProcessFlatBatchOutput> result) {
+                if (!result.isSuccessful() && result.getResult() != null
+                        && !result.getResult().getBatchFailure().isEmpty()) {
                     Map<Range<Integer>, Batch> batchMap = mapBatchesToRanges(inputBatchBag, failureIndexLimit);
                     decrementBatchFailuresCounters(result.getResult().getBatchFailure(), batchMap, counters);
                 }
             }
 
             @Override
-            public void onFailure(final Throwable t) {
+            public void onFailure(final Throwable failure) {
                 counters.resetAll();
             }
         };
@@ -236,7 +226,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     }
 
     @VisibleForTesting
-    static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
+    static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder,
+            final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
         // process flow remove
         int order = batchOrder;
         if (flowItemSyncTableMap != null) {
@@ -268,7 +259,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     }
 
     @VisibleForTesting
-    static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
+    static int assembleAddOrUpdateGroups(final List<Batch> batchBag, int batchOrder,
+            final List<ItemSyncBox<Group>> groupsToAddOrUpdate) {
         // process group add+update
         int order = batchOrder;
         if (groupsToAddOrUpdate != null) {
@@ -278,7 +270,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
                     int itemOrder = 0;
                     for (Group group : groupItemSyncBox.getItemsToPush()) {
-                        flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group).setBatchOrder(itemOrder++).build());
+                        flatBatchAddGroupBag.add(new FlatBatchAddGroupBuilder(group)
+                                .setBatchOrder(itemOrder++).build());
                     }
                     final Batch batch = new BatchBuilder()
                             .setBatchChoice(new FlatBatchAddGroupCaseBuilder()
@@ -296,10 +289,10 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                     int itemOrder = 0;
                     for (ItemSyncBox.ItemUpdateTuple<Group> groupUpdate : groupItemSyncBox.getItemsToUpdate()) {
                         flatBatchUpdateGroupBag.add(new FlatBatchUpdateGroupBuilder()
-                                .setBatchOrder(itemOrder++)
-                                .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
-                                .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
-                                .build());
+                            .setBatchOrder(itemOrder++)
+                            .setOriginalBatchedGroup(new OriginalBatchedGroupBuilder(groupUpdate.getOriginal()).build())
+                            .setUpdatedBatchedGroup(new UpdatedBatchedGroupBuilder(groupUpdate.getUpdated()).build())
+                            .build());
                     }
                     final Batch batch = new BatchBuilder()
                             .setBatchChoice(new FlatBatchUpdateGroupCaseBuilder()
@@ -316,7 +309,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     }
 
     @VisibleForTesting
-    static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder, final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
+    static int assembleRemoveGroups(final List<Batch> batchBag, int batchOrder,
+            final List<ItemSyncBox<Group>> groupsToRemoveOrUpdate) {
         // process group add+update
         int order = batchOrder;
         if (groupsToRemoveOrUpdate != null) {
@@ -326,7 +320,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                             new ArrayList<>(groupItemSyncBox.getItemsToUpdate().size());
                     int itemOrder = 0;
                     for (Group group : groupItemSyncBox.getItemsToPush()) {
-                        flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group).setBatchOrder(itemOrder++).build());
+                        flatBatchRemoveGroupBag.add(new FlatBatchRemoveGroupBuilder(group)
+                                .setBatchOrder(itemOrder++).build());
                     }
                     final Batch batch = new BatchBuilder()
                             .setBatchChoice(new FlatBatchRemoveGroupCaseBuilder()
@@ -343,7 +338,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     }
 
     @VisibleForTesting
-    static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
+    static int assembleAddOrUpdateMeters(final List<Batch> batchBag, int batchOrder,
+            final ItemSyncBox<Meter> meterItemSyncBox) {
         // process meter add+update
         int order = batchOrder;
         if (meterItemSyncBox != null) {
@@ -389,7 +385,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     }
 
     @VisibleForTesting
-    static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder, final ItemSyncBox<Meter> meterItemSyncBox) {
+    static int assembleRemoveMeters(final List<Batch> batchBag, int batchOrder,
+            final ItemSyncBox<Meter> meterItemSyncBox) {
         // process meter remove
         int order = batchOrder;
         if (meterItemSyncBox != null && !meterItemSyncBox.getItemsToPush().isEmpty()) {
@@ -412,7 +409,8 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
     }
 
     @VisibleForTesting
-    static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
+    static int assembleAddOrUpdateFlows(final List<Batch> batchBag, int batchOrder,
+            final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
         // process flow add+update
         int order = batchOrder;
         if (flowItemSyncTableMap != null) {
@@ -445,11 +443,11 @@ public class SyncPlanPushStrategyFlatBatchImpl implements SyncPlanPushStrategy {
                     int itemOrder = 0;
                     for (ItemSyncBox.ItemUpdateTuple<Flow> flowUpdate : flowItemSyncBox.getItemsToUpdate()) {
                         flatBatchUpdateFlowBag.add(new FlatBatchUpdateFlowBuilder()
-                                .setBatchOrder(itemOrder++)
-                                .setFlowId(flowUpdate.getUpdated().getId())
-                                .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
-                                .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
-                                .build());
+                            .setBatchOrder(itemOrder++)
+                            .setFlowId(flowUpdate.getUpdated().getId())
+                            .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(flowUpdate.getOriginal()).build())
+                            .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(flowUpdate.getUpdated()).build())
+                            .build());
                     }
                     final Batch batch = new BatchBuilder()
                             .setBatchChoice(new FlatBatchUpdateFlowCaseBuilder()