Update MRI projects for Aluminium
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowsBatchServiceImpl.java
index 73ca65df3bfd4a72d0dff6766a18ee54b6d8d303..40245358f9c699cbab511fa87b06edc919d549ce 100644 (file)
@@ -9,11 +9,10 @@ package org.opendaylight.openflowplugin.impl.services.sal;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
 import org.opendaylight.openflowplugin.impl.util.FlowUtil;
 import org.opendaylight.openflowplugin.impl.util.PathUtil;
@@ -61,26 +60,31 @@ public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
     public SalFlowsBatchServiceImpl(final SalFlowService salFlowService,
                                     final FlowCapableTransactionService transactionService) {
         this.salFlowService = Preconditions.checkNotNull(salFlowService, "delegate flow service must not be null");
-        this.transactionService = Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
+        this.transactionService =
+                Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
     }
 
     @Override
-    public Future<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
-        LOG.trace("Removing flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchRemoveFlows().size());
+    public ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
+        LOG.trace("Removing flows @ {} : {}",
+                  PathUtil.extractNodeId(input.getNode()),
+                  input.getBatchRemoveFlows().size());
         final ArrayList<ListenableFuture<RpcResult<RemoveFlowOutput>>> resultsLot = new ArrayList<>();
-        for (BatchFlowInputGrouping batchFlow : input.getBatchRemoveFlows()) {
+        for (BatchFlowInputGrouping batchFlow : input.nonnullBatchRemoveFlows().values()) {
             final RemoveFlowInput removeFlowInput = new RemoveFlowInputBuilder(batchFlow)
                     .setFlowRef(createFlowRef(input.getNode(), batchFlow))
                     .setNode(input.getNode())
                     .build();
-            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.removeFlow(removeFlowInput)));
+            resultsLot.add(salFlowService.removeFlow(removeFlowInput));
         }
 
         final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
                 Futures.transform(Futures.successfulAsList(resultsLot),
-                        FlowUtil.<RemoveFlowOutput>createCumulatingFunction(input.getBatchRemoveFlows()));
+                        FlowUtil.createCumulatingFunction(input.nonnullBatchRemoveFlows().values()),
+                        MoreExecutors.directExecutor());
 
-        ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture = Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM);
+        ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture =
+                Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM, MoreExecutors.directExecutor());
 
         if (input.isBarrierAfter()) {
             removeFlowsBulkFuture = BarrierUtil.chainBarrier(removeFlowsBulkFuture, input.getNode(),
@@ -91,23 +95,24 @@ public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
     }
 
     @Override
-    public Future<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
+    public ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
         LOG.trace("Adding flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddFlows().size());
         final ArrayList<ListenableFuture<RpcResult<AddFlowOutput>>> resultsLot = new ArrayList<>();
-        for (BatchFlowInputGrouping batchFlow : input.getBatchAddFlows()) {
+        for (BatchFlowInputGrouping batchFlow : input.nonnullBatchAddFlows().values()) {
             final AddFlowInput addFlowInput = new AddFlowInputBuilder(batchFlow)
                     .setFlowRef(createFlowRef(input.getNode(), batchFlow))
                     .setNode(input.getNode())
                     .build();
-            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.addFlow(addFlowInput)));
+            resultsLot.add(salFlowService.addFlow(addFlowInput));
         }
 
         final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
                 Futures.transform(Futures.successfulAsList(resultsLot),
-                        FlowUtil.<AddFlowOutput>createCumulatingFunction(input.getBatchAddFlows()));
+                        FlowUtil.createCumulatingFunction(input.nonnullBatchAddFlows().values()),
+                        MoreExecutors.directExecutor());
 
         ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBulkFuture =
-                Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM);
+                Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM, MoreExecutors.directExecutor());
 
         if (input.isBarrierAfter()) {
             addFlowsBulkFuture = BarrierUtil.chainBarrier(addFlowsBulkFuture, input.getNode(),
@@ -128,23 +133,28 @@ public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
-        LOG.trace("Updating flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchUpdateFlows().size());
+    public ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
+        LOG.trace("Updating flows @ {} : {}",
+                  PathUtil.extractNodeId(input.getNode()),
+                  input.getBatchUpdateFlows().size());
         final ArrayList<ListenableFuture<RpcResult<UpdateFlowOutput>>> resultsLot = new ArrayList<>();
-        for (BatchUpdateFlows batchFlow : input.getBatchUpdateFlows()) {
+        for (BatchUpdateFlows batchFlow : input.nonnullBatchUpdateFlows().values()) {
             final UpdateFlowInput updateFlowInput = new UpdateFlowInputBuilder(input)
                     .setOriginalFlow(new OriginalFlowBuilder(batchFlow.getOriginalBatchedFlow()).build())
                     .setUpdatedFlow(new UpdatedFlowBuilder(batchFlow.getUpdatedBatchedFlow()).build())
                     .setFlowRef(createFlowRef(input.getNode(), batchFlow))
                     .setNode(input.getNode())
                     .build();
-            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.updateFlow(updateFlowInput)));
+            resultsLot.add(salFlowService.updateFlow(updateFlowInput));
         }
 
         final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
-                Futures.transform(Futures.successfulAsList(resultsLot), FlowUtil.<UpdateFlowOutput>createCumulatingFunction(input.getBatchUpdateFlows()));
+                Futures.transform(Futures.successfulAsList(resultsLot),
+                                  FlowUtil.createCumulatingFunction(input.nonnullBatchUpdateFlows().values()),
+                        MoreExecutors.directExecutor());
 
-        ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture = Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM);
+        ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture =
+                Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM, MoreExecutors.directExecutor());
 
         if (input.isBarrierAfter()) {
             updateFlowsBulkFuture = BarrierUtil.chainBarrier(updateFlowsBulkFuture, input.getNode(),
@@ -153,5 +163,4 @@ public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
 
         return updateFlowsBulkFuture;
     }
-
 }