Bug 5578 Improve frsync (cleaning) 84/41384/2
authorAndrej Leitner <anleitne@cisco.com>
Wed, 6 Jul 2016 07:53:44 +0000 (09:53 +0200)
committerAndrej Leitner <anleitne@cisco.com>
Wed, 6 Jul 2016 09:47:15 +0000 (11:47 +0200)
 - method for RPC results merge moved into utils
 - updated tests
 - renamed methods according to functionality changes
 - improved comments

Change-Id: I705941d48e7f459cea9df61487b6a4392480b54f
Signed-off-by: Andrej Leitner <anleitne@cisco.com>
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlatBatchServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/batch/BatchStepJob.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/batch/FlatBatchFlowAdapters.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/batch/FlatBatchGroupAdapters.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/batch/FlatBatchMeterAdapters.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/FlatBatchUtil.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/batch/FlatBatchFlowAdaptersTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/batch/FlatBatchGroupAdaptersTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/batch/FlatBatchMeterAdaptersTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/FlatBatchUtilTest.java

index d700c9a8f7180fea26c3c6fc13c2f98523717bb3..69647dd4f79864afdabe6ab4699f9e5ddb267323 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.openflowplugin.impl.services;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
@@ -27,7 +26,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev16032
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
@@ -51,7 +49,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.Re
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -105,41 +102,13 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
             // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
             if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size()-1)) {
                 firedJobs.add(0, chainSummaryResult);
-                chainSummaryResult = mergeJobsResultsFutures(firedJobs);
+                chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
                 firedJobs.clear();
             }
         }
         return chainSummaryResult;
     }
 
-    private static ListenableFuture<RpcResult<ProcessFlatBatchOutput>> mergeJobsResultsFutures(
-            final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs) {
-
-        ListenableFuture<List<RpcResult<ProcessFlatBatchOutput>>> jobs = Futures.successfulAsList(firedJobs);
-
-        return Futures.transform(jobs, new AsyncFunction<List<RpcResult<ProcessFlatBatchOutput>>, RpcResult<ProcessFlatBatchOutput>>() {
-            @Override
-            public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(List<RpcResult<ProcessFlatBatchOutput>> jobsResults) {
-                boolean isSuccessful = true;
-                List<RpcError> rpcErrors = new ArrayList<>();
-                List<BatchFailure> batchFailures = new ArrayList<>();
-
-                for (RpcResult<ProcessFlatBatchOutput> jobResult : jobsResults) {
-                    if (jobResult != null) {
-                        isSuccessful = (isSuccessful && jobResult.isSuccessful());
-                        rpcErrors.addAll(jobResult.getErrors());
-                        batchFailures.addAll(jobResult.getResult().getBatchFailure());
-                    }
-                }
-
-                return RpcResultBuilder.<ProcessFlatBatchOutput>status(isSuccessful)
-                        .withRpcErrors(rpcErrors)
-                        .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(batchFailures).build())
-                        .buildFuture();
-            }
-        });
-    }
-
     @VisibleForTesting
     List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan, final NodeRef node, final boolean exitOnFirstError) {
         // create batch API calls based on plan steps
@@ -147,76 +116,73 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
         int stepOffset = 0;
         for (final BatchPlanStep planStep : batchPlan) {
             final int currentOffset = stepOffset;
-            chainJobs.add(new BatchStepJob(planStep, new AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>() {
-                @Override
-                public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) {
-                    if (exitOnFirstError && !chainInput.isSuccessful()) {
-                        LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
-                        return RpcResultBuilder.<ProcessFlatBatchOutput>status(false)
-                                               .withRpcErrors(new ArrayList<>())
-                                               .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
-                                               .buildFuture();
-                    }
-
-                    LOG.trace("batch progressing on step type {}", planStep.getStepType());
-                    LOG.trace("batch progressing previous steps result: {}", chainInput.isSuccessful());
+            chainJobs.add(new BatchStepJob(planStep, chainInput -> {
+                if (exitOnFirstError && !chainInput.isSuccessful()) {
+                    LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
+                    return RpcResultBuilder.<ProcessFlatBatchOutput>status(false)
+                                           .withRpcErrors(new ArrayList<>())
+                                           .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
+                                           .buildFuture();
+                }
 
-                    final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
-                    switch (planStep.getStepType()) {
-                        case FLOW_ADD:
-                            final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
-                            final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
-                            chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
-                            break;
-                        case FLOW_REMOVE:
-                            final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
-                            final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
-                            chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
-                            break;
-                        case FLOW_UPDATE:
-                            final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
-                            final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
-                            chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
-                            break;
-                        case GROUP_ADD:
-                            final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
-                            final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
-                            chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
-                            break;
-                        case GROUP_REMOVE:
-                            final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
-                            final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
-                            chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
-                            break;
-                        case GROUP_UPDATE:
-                            final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
-                            final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
-                            chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
-                            break;
-                        case METER_ADD:
-                            final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
-                            final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
-                            chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
-                            break;
-                        case METER_REMOVE:
-                            final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
-                            final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
-                            chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
-                            break;
-                        case METER_UPDATE:
-                            final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
-                            final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
-                            chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
-                            break;
-                        default:
-                            LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
-                            chainOutput = RpcResultBuilder.<ProcessFlatBatchOutput>status(true)
-                                                          .withRpcErrors(new ArrayList<>())
-                                                          .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
-                                                          .buildFuture();
-                    }
-                    return chainOutput;
+                LOG.trace("batch progressing on step type {}", planStep.getStepType());
+                LOG.trace("batch progressing previous steps result: {}", chainInput.isSuccessful());
+
+                final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
+                switch (planStep.getStepType()) {
+                    case FLOW_ADD:
+                        final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
+                        final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
+                        chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
+                        break;
+                    case FLOW_REMOVE:
+                        final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
+                        final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
+                        chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
+                        break;
+                    case FLOW_UPDATE:
+                        final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
+                        final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
+                        chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
+                        break;
+                    case GROUP_ADD:
+                        final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
+                        final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
+                        chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
+                        break;
+                    case GROUP_REMOVE:
+                        final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
+                        final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
+                        chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
+                        break;
+                    case GROUP_UPDATE:
+                        final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
+                        final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
+                        chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
+                        break;
+                    case METER_ADD:
+                        final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
+                        final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
+                        chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
+                        break;
+                    case METER_REMOVE:
+                        final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
+                        final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
+                        chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
+                        break;
+                    case METER_UPDATE:
+                        final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
+                        final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
+                        chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
+                        break;
+                    default:
+                        LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
+                        chainOutput = RpcResultBuilder.<ProcessFlatBatchOutput>status(true)
+                                                      .withRpcErrors(new ArrayList<>())
+                                                      .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
+                                                      .buildFuture();
                 }
+                return chainOutput;
             }));
             stepOffset += planStep.getTaskBag().size();
         }
index 0449c17519f02ad25b609be3499746af73258a11..a64175a5e5f748dbb28b583b6c93c2d5635653b0 100644 (file)
@@ -13,14 +13,15 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev16032
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
 /**
- * Wrapper for batch step jobs.
+ * Wrapper for batch step jobs (type + function).
  */
 public class BatchStepJob {
 
     private final BatchPlanStep planStep;
     private final AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> stepFunction;
 
-    public BatchStepJob(final BatchPlanStep planStep, final AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> stepFunction) {
+    public BatchStepJob(final BatchPlanStep planStep,
+                        final AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> stepFunction) {
         this.planStep = planStep;
         this.stepFunction = stepFunction;
     }
index a2d93b9ef324617eab81a1845b2e4dacc3471457..7949cc0094d1c5142bb1287213a8c9df54b520b6 100644 (file)
@@ -117,11 +117,11 @@ public class FlatBatchFlowAdapters {
 
     /**
      * @param stepOffset offset of current batch plan step
-     * @return next chained result incorporating results of this step's batch
+     * @return converted {@link ProcessFlatBatchOutput} RPC result
      */
     @VisibleForTesting
     static <T extends BatchFlowOutputListGrouping> Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>
-    createBatchFlowChainingFunction(final int stepOffset) {
+    convertBatchFlowResult(final int stepOffset) {
         return new Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>() {
             @Nullable
             @Override
@@ -154,17 +154,17 @@ public class FlatBatchFlowAdapters {
     }
 
     /**
-     * shortcut for {@link #createBatchFlowChainingFunction(int)} with conversion {@link ListenableFuture}
+     * shortcut for {@link #convertBatchFlowResult(int)} with conversion {@link ListenableFuture}
      *
      * @param <T>                    exact type of batch flow output
      * @param resultUpdateFlowFuture batch flow rpc-result (add/remove/update)
      * @param currentOffset          offset of current batch plan step with respect to entire chain of steps
-     * @return next chained result incorporating results of this step's batch
+     * @return ListenableFuture with converted result {@link ProcessFlatBatchOutput}
      */
     public static <T extends BatchFlowOutputListGrouping> ListenableFuture<RpcResult<ProcessFlatBatchOutput>>
-    adaptFlowBatchFutureForChain(final Future<RpcResult<T>> resultUpdateFlowFuture,
-                                 final int currentOffset) {
+    convertFlowBatchFutureForChain(final Future<RpcResult<T>> resultUpdateFlowFuture,
+                                   final int currentOffset) {
         return Futures.transform(JdkFutureAdapters.listenInPoolThread(resultUpdateFlowFuture),
-                FlatBatchFlowAdapters.<T>createBatchFlowChainingFunction(currentOffset));
+                FlatBatchFlowAdapters.<T>convertBatchFlowResult(currentOffset));
     }
 }
index bad00a2c80f738485d096d7de787e18b08c20450..05731129fc97380643d6d39bf30e353a57b322bf 100644 (file)
@@ -116,11 +116,11 @@ public class FlatBatchGroupAdapters {
 
     /**
      * @param stepOffset offset of current batch plan step
-     * @return next chained result incorporating results of this step's batch
+     * @return converted {@link ProcessFlatBatchOutput} RPC result
      */
     @VisibleForTesting
     static <T extends BatchGroupOutputListGrouping> Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>
-    createBatchGroupChainingFunction(final int stepOffset) {
+    convertBatchGroupResult(final int stepOffset) {
         return new Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>() {
             @Nullable
             @Override
@@ -153,17 +153,17 @@ public class FlatBatchGroupAdapters {
     }
 
     /**
-     * shortcut for {@link #createBatchGroupChainingFunction(int)} with conversion {@link ListenableFuture}
+     * shortcut for {@link #convertBatchGroupResult(int)} with conversion {@link ListenableFuture}
      *
      * @param <T>                     exact type of batch flow output
      * @param resultUpdateGroupFuture batch group rpc-result (add/remove/update)
      * @param currentOffset           offset of current batch plan step with respect to entire chain of steps
-     * @return next chained result incorporating results of this step's batch
+     * @return ListenableFuture with converted result {@link ProcessFlatBatchOutput}
      */
     public static <T extends BatchGroupOutputListGrouping> ListenableFuture<RpcResult<ProcessFlatBatchOutput>>
-    adaptGroupBatchFutureForChain(final Future<RpcResult<T>> resultUpdateGroupFuture,
-                                  final int currentOffset) {
+    convertGroupBatchFutureForChain(final Future<RpcResult<T>> resultUpdateGroupFuture,
+                                    final int currentOffset) {
         return Futures.transform(JdkFutureAdapters.listenInPoolThread(resultUpdateGroupFuture),
-                FlatBatchGroupAdapters.<T>createBatchGroupChainingFunction(currentOffset));
+                FlatBatchGroupAdapters.<T>convertBatchGroupResult(currentOffset));
     }
 }
index 33d6a4bad0eac5de8e3b227baea45a0c721cd29c..2dc3ef40f4e4c400509a030562d54804cb58c2fc 100644 (file)
@@ -116,11 +116,11 @@ public class FlatBatchMeterAdapters {
 
     /**
      * @param stepOffset offset of current batch plan step
-     * @return next chained result incorporating results of this step's batch
+     * @return converted {@link ProcessFlatBatchOutput} RPC result
      */
     @VisibleForTesting
     static <T extends BatchMeterOutputListGrouping> Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>
-    createBatchMeterChainingFunction(final int stepOffset) {
+    convertBatchMeterResult(final int stepOffset) {
         return new Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>() {
             @Nullable
             @Override
@@ -153,17 +153,17 @@ public class FlatBatchMeterAdapters {
     }
 
     /**
-     * shortcut for {@link #createBatchMeterChainingFunction(int)} with conversion {@link ListenableFuture}
+     * shortcut for {@link #convertBatchMeterResult(int)} with conversion {@link ListenableFuture}
      *
      * @param <T>                     exact type of batch flow output
      * @param resultUpdateMeterFuture batch group rpc-result (add/remove/update)
      * @param currentOffset           offset of current batch plan step with respect to entire chain of steps
-     * @return next chained result incorporating results of this step's batch
+     * @return ListenableFuture with converted result {@link ProcessFlatBatchOutput}
      */
     public static <T extends BatchMeterOutputListGrouping> ListenableFuture<RpcResult<ProcessFlatBatchOutput>>
-    adaptMeterBatchFutureForChain(final Future<RpcResult<T>> resultUpdateMeterFuture,
-                                  final int currentOffset) {
+    convertMeterBatchFutureForChain(final Future<RpcResult<T>> resultUpdateMeterFuture,
+                                    final int currentOffset) {
         return Futures.transform(JdkFutureAdapters.listenInPoolThread(resultUpdateMeterFuture),
-                FlatBatchMeterAdapters.<T>createBatchMeterChainingFunction(currentOffset));
+                FlatBatchMeterAdapters.<T>convertBatchMeterResult(currentOffset));
     }
 }
index 49d24e28f59dda7e2e751651c1b13056c46649d9..8bd950cff20db5d2953346b11f69986ad0ab8ea0 100644 (file)
@@ -9,12 +9,16 @@
 package org.opendaylight.openflowplugin.impl.util;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
@@ -26,6 +30,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev16032
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.service.batch.common.rev160322.BatchOrderGrouping;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -173,52 +178,40 @@ public final class FlatBatchUtil {
     }
 
     /**
-     * join errors of left and right rpc result into output
-     *
-     * @param output     target result
-     * @param chainInput left part (chained rpc result)
-     * @param input      right part (result of current operation)
-     * @param <L>        chain type
-     * @param <R>        current operation type
+     * @return RPC result incorporating partial results (state, errors, batch failures)
      */
-    private static <L, R> void joinErrors(final RpcResultBuilder<L> output, final RpcResult<L> chainInput, final RpcResult<R> input) {
-        final Collection<RpcError> rpcErrors = new ArrayList<>(chainInput.getErrors());
-        rpcErrors.addAll(input.getErrors());
-        if (!rpcErrors.isEmpty()) {
-            output.withRpcErrors(rpcErrors);
-        }
-    }
+    @VisibleForTesting
+    static Function<List<RpcResult<ProcessFlatBatchOutput>>, RpcResult<ProcessFlatBatchOutput>> mergeRpcResults() {
+        return jobsResults -> {
+            boolean isSuccessful = true;
+            List<RpcError> rpcErrors = new ArrayList<>();
+            List<BatchFailure> batchFailures = new ArrayList<>();
+
+            for (RpcResult<ProcessFlatBatchOutput> jobResult : jobsResults) {
+                if (jobResult != null) {
+                    isSuccessful = (isSuccessful && jobResult.isSuccessful());
+                    rpcErrors.addAll(jobResult.getErrors());
+                    batchFailures.addAll(jobResult.getResult().getBatchFailure());
+                }
+            }
 
-    /**
-     * create rpc result honoring success/fail outcomes of arguments
-     *
-     * @param chainInput left part (chained rpc result)
-     * @param input      right part (results of current operation)
-     * @param <L>        chain type
-     * @param <R>        current operation type
-     * @return rpc result with combined status
-     */
-    private static <L, R> RpcResultBuilder<L> createNextRpcResultBuilder(final RpcResult<L> chainInput, final RpcResult<R> input) {
-        return RpcResultBuilder.<L>status(input.isSuccessful() && chainInput.isSuccessful());
+            return RpcResultBuilder.<ProcessFlatBatchOutput>status(isSuccessful)
+                    .withRpcErrors(rpcErrors)
+                    .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(batchFailures).build())
+                    .build();
+        };
     }
 
     /**
-     * Create rpc result builder with combined status and sum of all errors.
-     * <br>
-     * Shortcut for {@link #createNextRpcResultBuilder(RpcResult, RpcResult)} and
-     * {@link #joinErrors(RpcResultBuilder, RpcResult, RpcResult)}.
-     *
-     * @param chainInput left part (chained rpc result)
-     * @param input      right part (results of current operation)
-     * @param <L>        chain type
-     * @param <R>        current operation type
-     * @return rpc result with combined status and all errors
+     * Merge list of Futures with partial results into one ListenableFuture with single result.
+     * shortcut for {@link #mergeRpcResults()}
+     * @param firedJobs list of ListenableFutures with RPC results {@link ProcessFlatBatchOutput}
+     * @return ListenableFuture of RPC result with combined status and all errors + batch failures
      */
-    public static <L, R> RpcResultBuilder<L> mergeRpcResults(final RpcResult<L> chainInput, final RpcResult<R> input) {
-        // create rpcResult builder honoring both success/failure of current input and chained input
-        final RpcResultBuilder<L> output = FlatBatchUtil.createNextRpcResultBuilder(chainInput, input);
-        // join errors
-        FlatBatchUtil.joinErrors(output, chainInput, input);
-        return output;
+    public static ListenableFuture<RpcResult<ProcessFlatBatchOutput>> mergeJobsResultsFutures(
+            final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs) {
+        return Futures.transform(Futures.successfulAsList(firedJobs), mergeRpcResults());
     }
+
+
 }
index 26e257697cc01ecb285c4d13b10ec2055fe1446e..c604817978f44ee131bbcf25f78ff84448f2c87a 100644 (file)
@@ -129,7 +129,7 @@ public class FlatBatchFlowAdaptersTest {
                 .build();
 
         final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchFlowAdapters
-                .createBatchFlowChainingFunction(3).apply(input);
+                .convertBatchFlowResult(3).apply(input);
 
         Assert.assertFalse(rpcResult.isSuccessful());
         Assert.assertEquals(1, rpcResult.getErrors().size());
@@ -146,7 +146,7 @@ public class FlatBatchFlowAdaptersTest {
                 .build();
 
         final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchFlowAdapters
-                .createBatchFlowChainingFunction(0).apply(input);
+                .convertBatchFlowResult(0).apply(input);
 
         Assert.assertTrue(rpcResult.isSuccessful());
         Assert.assertEquals(0, rpcResult.getErrors().size());
index 8e289685ee1f7cb9105f690e6fef45bac0db9233..42228173799d9241fd17c0fe99311c0bb4f379d0 100644 (file)
@@ -136,7 +136,7 @@ public class FlatBatchGroupAdaptersTest {
                 .build();
 
         final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchGroupAdapters
-                .createBatchGroupChainingFunction(3).apply(input);
+                .convertBatchGroupResult(3).apply(input);
 
         Assert.assertFalse(rpcResult.isSuccessful());
         Assert.assertEquals(1, rpcResult.getErrors().size());
@@ -153,7 +153,7 @@ public class FlatBatchGroupAdaptersTest {
                 .build();
 
         final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchGroupAdapters
-                .createBatchGroupChainingFunction(0).apply(input);
+                .convertBatchGroupResult(0).apply(input);
 
         Assert.assertTrue(rpcResult.isSuccessful());
         Assert.assertEquals(0, rpcResult.getErrors().size());
index 970ba6a001e4b530aa7e6de2235d9d2715354405..7ef5d07adb691cd2014fed8611fac0cbb55d2593 100644 (file)
@@ -136,7 +136,7 @@ public class FlatBatchMeterAdaptersTest {
                 .build();
 
         final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchMeterAdapters
-                .createBatchMeterChainingFunction(3).apply(input);
+                .convertBatchMeterResult(3).apply(input);
 
         Assert.assertFalse(rpcResult.isSuccessful());
         Assert.assertEquals(1, rpcResult.getErrors().size());
@@ -153,7 +153,7 @@ public class FlatBatchMeterAdaptersTest {
                 .build();
 
         final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchMeterAdapters
-                .createBatchMeterChainingFunction(0).apply(input);
+                .convertBatchMeterResult(0).apply(input);
 
         Assert.assertTrue(rpcResult.isSuccessful());
         Assert.assertEquals(0, rpcResult.getErrors().size());
index 4b1f65715a31523db9e5352174a094a9543eb68a..cdf78c4448272938069664b2633f553bd3a98167 100644 (file)
@@ -16,6 +16,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
@@ -37,6 +39,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev16032
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailureBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.batch.failure.batch.item.id.choice.FlatBatchFailureFlowIdCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -256,20 +262,31 @@ public class FlatBatchUtilTest {
     }
 
     @Test
-    public void testMergeRpcResults() throws Exception {
-        final RpcResult<String> rpcResultFailed = RpcResultBuilder.<String>failed()
-                .withError(RpcError.ErrorType.APPLICATION, "ut-rpcError").build();
-        final RpcResult<String> rpcResultSuccess = RpcResultBuilder.<String>success().build();
+    public void testMergeJobsResultsFutures() throws Exception {
+        final BatchFailure batchFailure = new BatchFailureBuilder()
+                .setBatchOrder(9)
+                .setBatchItemIdChoice(new FlatBatchFailureFlowIdCaseBuilder()
+                        .setFlowId(new FlowId("11"))
+                        .build())
+                .build();
+
+        final ProcessFlatBatchOutput output = new ProcessFlatBatchOutputBuilder().setBatchFailure(Lists.newArrayList(batchFailure)).build();
+
+        final RpcResult<ProcessFlatBatchOutput> rpcResultFailed = RpcResultBuilder.<ProcessFlatBatchOutput>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "ut-rpcError")
+                .withResult(output).build();
+        final RpcResult<ProcessFlatBatchOutput> rpcResultSuccess = RpcResultBuilder.<ProcessFlatBatchOutput>success()
+                .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>())).build();
 
-        final RpcResult<String> rpcResult1 = FlatBatchUtil.mergeRpcResults(rpcResultFailed, rpcResultSuccess).build();
+        final RpcResult<ProcessFlatBatchOutput> rpcResult1 = FlatBatchUtil.mergeRpcResults().apply(Lists.newArrayList(rpcResultFailed, rpcResultSuccess));
         Assert.assertEquals(1, rpcResult1.getErrors().size());
         Assert.assertFalse(rpcResult1.isSuccessful());
 
-        final RpcResult<String> rpcResult2 = FlatBatchUtil.mergeRpcResults(rpcResultFailed, rpcResultFailed).build();
+        final RpcResult<ProcessFlatBatchOutput> rpcResult2 = FlatBatchUtil.mergeRpcResults().apply(Lists.newArrayList(rpcResultFailed, rpcResultFailed));
         Assert.assertEquals(2, rpcResult2.getErrors().size());
         Assert.assertFalse(rpcResult2.isSuccessful());
 
-        final RpcResult<String> rpcResult3 = FlatBatchUtil.mergeRpcResults(rpcResultSuccess, rpcResultSuccess).build();
+        final RpcResult<ProcessFlatBatchOutput> rpcResult3 = FlatBatchUtil.mergeRpcResults().apply(Lists.newArrayList(rpcResultSuccess, rpcResultSuccess));
         Assert.assertEquals(0, rpcResult3.getErrors().size());
         Assert.assertTrue(rpcResult3.isSuccessful());
     }