Reduce use of JdkFutureAdapters
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlatBatchServiceImpl.java
index 3eca9a73ac52210c11875d0e2ef12188efd2f715..f4e0c4be800aa74e1f3ff14bc72cc31c1357a150 100644 (file)
@@ -12,9 +12,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
 import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
 import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}
+ * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}.
  */
 public class SalFlatBatchServiceImpl implements SalFlatBatchService {
     private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class);
@@ -66,25 +66,30 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
                                    final SalGroupsBatchService salGroupsBatchService,
                                    final SalMetersBatchService salMetersBatchService) {
         this.salFlowService = Preconditions.checkNotNull(salFlowBatchService, "delegate flow service must not be null");
-        this.salGroupService = Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null");
-        this.salMeterService = Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null");
+        this.salGroupService =
+                Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null");
+        this.salMeterService =
+                Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null");
     }
 
     @Override
-    public Future<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
-        LOG.trace("processing flat batch @ {} : {}", PathUtil.extractNodeId(input.getNode()).getValue(), input.getBatch().size());
+    public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
+        LOG.trace("processing flat batch @ {} : {}",
+                  PathUtil.extractNodeId(input.getNode()).getValue(),
+                  input.getBatch().size());
         // create plan
         final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch());
         // add barriers where needed
         FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
         // prepare chain elements
-        final List<BatchStepJob> batchChainElements = prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError());
+        final List<BatchStepJob> batchChainElements =
+                prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError());
         // execute plan with barriers and collect outputs chain correspondingly, collect results
         return executeBatchPlan(batchChainElements);
     }
 
     @VisibleForTesting
-    Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
+    ListenableFuture<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
         BatchStepJob batchJob;
         final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs = new ArrayList<>();
         ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
@@ -93,9 +98,10 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
         for (int i = 0; i < batchJobsChain.size(); i++)  {
             batchJob = batchJobsChain.get(i);
             // wire actual job with chain
-            firedJobs.add(Futures.transform(chainSummaryResult, batchJob.getStepFunction()));
+            firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
+                    MoreExecutors.directExecutor()));
             // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
-            if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size()-1)) {
+            if (batchJob.getPlanStep().isBarrierAfter() || i == batchJobsChain.size() - 1) {
                 firedJobs.add(0, chainSummaryResult);
                 chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
                 firedJobs.clear();
@@ -105,7 +111,9 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
     }
 
     @VisibleForTesting
-    List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan, final NodeRef node, final boolean exitOnFirstError) {
+    List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan,
+                                         final NodeRef node,
+                                         final boolean exitOnFirstError) {
         // create batch API calls based on plan steps
         final List<BatchStepJob> chainJobs = new ArrayList<>();
         int stepOffset = 0;
@@ -133,49 +141,75 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
 
         switch (planStep.getStepType()) {
             case FLOW_ADD:
-                final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
-                final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
+                final AddFlowsBatchInput addFlowsBatchInput =
+                        FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
+                final ListenableFuture<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture =
+                        salFlowService.addFlowsBatch(addFlowsBatchInput);
                 chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
                 break;
             case FLOW_REMOVE:
-                final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
-                final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
-                chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
+                final RemoveFlowsBatchInput removeFlowsBatchInput =
+                        FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
+                final ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture =
+                        salFlowService.removeFlowsBatch(removeFlowsBatchInput);
+                chainOutput =
+                        FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
                 break;
             case FLOW_UPDATE:
-                final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
-                final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
-                chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
+                final UpdateFlowsBatchInput updateFlowsBatchInput =
+                        FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
+                final ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture =
+                        salFlowService.updateFlowsBatch(updateFlowsBatchInput);
+                chainOutput =
+                        FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
                 break;
             case GROUP_ADD:
-                final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
-                final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
-                chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
+                final AddGroupsBatchInput addGroupsBatchInput =
+                        FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
+                final ListenableFuture<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture =
+                        salGroupService.addGroupsBatch(addGroupsBatchInput);
+                chainOutput =
+                        FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
                 break;
             case GROUP_REMOVE:
-                final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
-                final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
-                chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
+                final RemoveGroupsBatchInput removeGroupsBatchInput =
+                        FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
+                final ListenableFuture<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture =
+                        salGroupService.removeGroupsBatch(removeGroupsBatchInput);
+                chainOutput =
+                        FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
                 break;
             case GROUP_UPDATE:
-                final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
-                final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
-                chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
+                final UpdateGroupsBatchInput updateGroupsBatchInput =
+                        FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
+                final ListenableFuture<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture =
+                        salGroupService.updateGroupsBatch(updateGroupsBatchInput);
+                chainOutput =
+                        FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
                 break;
             case METER_ADD:
-                final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
-                final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
-                chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
+                final AddMetersBatchInput addMetersBatchInput =
+                        FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
+                final ListenableFuture<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture =
+                        salMeterService.addMetersBatch(addMetersBatchInput);
+                chainOutput =
+                        FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
                 break;
             case METER_REMOVE:
-                final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
-                final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
-                chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
+                final RemoveMetersBatchInput removeMetersBatchInput =
+                        FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
+                final ListenableFuture<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture =
+                        salMeterService.removeMetersBatch(removeMetersBatchInput);
+                chainOutput =
+                        FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
                 break;
             case METER_UPDATE:
-                final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
-                final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
-                chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
+                final UpdateMetersBatchInput updateMetersBatchInput =
+                        FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
+                final ListenableFuture<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture =
+                        salMeterService.updateMetersBatch(updateMetersBatchInput);
+                chainOutput =
+                        FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
                 break;
             default:
                 LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());