Update MRI projects for Aluminium
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalMetersBatchServiceImpl.java
index 9a9a00b2c1abf21e6ecbd24fab55053a7a1d850e..174cb4db7739d845fe2b0dab5f88c225faec4130 100644 (file)
@@ -8,16 +8,13 @@
 
 package org.opendaylight.openflowplugin.impl.services.sal;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Future;
-import javax.annotation.Nullable;
+import java.util.stream.Collectors;
 import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
 import org.opendaylight.openflowplugin.impl.util.MeterUtil;
 import org.opendaylight.openflowplugin.impl.util.PathUtil;
@@ -40,6 +37,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.BatchMeterInputUpdateGrouping;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
@@ -64,13 +62,14 @@ public class SalMetersBatchServiceImpl implements SalMetersBatchService {
     private final SalMeterService salMeterService;
     private final FlowCapableTransactionService transactionService;
 
-    public SalMetersBatchServiceImpl(final SalMeterService salMeterService, final FlowCapableTransactionService transactionService) {
+    public SalMetersBatchServiceImpl(final SalMeterService salMeterService,
+                                     final FlowCapableTransactionService transactionService) {
         this.salMeterService = Preconditions.checkNotNull(salMeterService);
         this.transactionService = Preconditions.checkNotNull(transactionService);
     }
 
     @Override
-    public Future<RpcResult<UpdateMetersBatchOutput>> updateMetersBatch(final UpdateMetersBatchInput input) {
+    public ListenableFuture<RpcResult<UpdateMetersBatchOutput>> updateMetersBatch(final UpdateMetersBatchInput input) {
         final List<BatchUpdateMeters> batchUpdateMeters = input.getBatchUpdateMeters();
         LOG.trace("Updating meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), batchUpdateMeters.size());
 
@@ -82,24 +81,20 @@ public class SalMetersBatchServiceImpl implements SalMetersBatchService {
                     .setMeterRef(createMeterRef(input.getNode(), batchMeter))
                     .setNode(input.getNode())
                     .build();
-            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.updateMeter(updateMeterInput)));
+            resultsLot.add(salMeterService.updateMeter(updateMeterInput));
         }
 
-        final Iterable<Meter> meters = Iterables.transform(batchUpdateMeters, new Function<BatchUpdateMeters, Meter>() {
-                    @Nullable
-                    @Override
-                    public Meter apply(@Nullable final BatchUpdateMeters input) {
-                        return input.getUpdatedBatchedMeter();
-                    }
-                }
-        );
+        final Iterable<Meter> meters = batchUpdateMeters.stream()
+                .map(BatchMeterInputUpdateGrouping::getUpdatedBatchedMeter)
+                .collect(Collectors.toList());
 
         final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
-                Futures.transform(Futures.allAsList(resultsLot), MeterUtil.<UpdateMeterOutput>createCumulativeFunction(
-                        meters, batchUpdateMeters.size()));
+                Futures.transform(Futures.allAsList(resultsLot),
+                        MeterUtil.createCumulativeFunction(meters, batchUpdateMeters.size()),
+                        MoreExecutors.directExecutor());
 
         ListenableFuture<RpcResult<UpdateMetersBatchOutput>> updateMetersBulkFuture =
-                Futures.transform(commonResult, MeterUtil.METER_UPDATE_TRANSFORM);
+                Futures.transform(commonResult, MeterUtil.METER_UPDATE_TRANSFORM, MoreExecutors.directExecutor());
 
         if (input.isBarrierAfter()) {
             updateMetersBulkFuture = BarrierUtil.chainBarrier(updateMetersBulkFuture, input.getNode(),
@@ -110,23 +105,24 @@ public class SalMetersBatchServiceImpl implements SalMetersBatchService {
     }
 
     @Override
-    public Future<RpcResult<AddMetersBatchOutput>> addMetersBatch(final AddMetersBatchInput input) {
+    public ListenableFuture<RpcResult<AddMetersBatchOutput>> addMetersBatch(final AddMetersBatchInput input) {
         LOG.trace("Adding meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddMeters().size());
         final ArrayList<ListenableFuture<RpcResult<AddMeterOutput>>> resultsLot = new ArrayList<>();
-        for (BatchAddMeters addMeter : input.getBatchAddMeters()) {
+        for (BatchAddMeters addMeter : input.nonnullBatchAddMeters().values()) {
             final AddMeterInput addMeterInput = new AddMeterInputBuilder(addMeter)
                     .setMeterRef(createMeterRef(input.getNode(), addMeter))
                     .setNode(input.getNode())
                     .build();
-            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.addMeter(addMeterInput)));
+            resultsLot.add(salMeterService.addMeter(addMeterInput));
         }
 
         final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
                 Futures.transform(Futures.allAsList(resultsLot),
-                        MeterUtil.<AddMeterOutput>createCumulativeFunction(input.getBatchAddMeters()));
+                        MeterUtil.createCumulativeFunction(input.nonnullBatchAddMeters().values()),
+                        MoreExecutors.directExecutor());
 
         ListenableFuture<RpcResult<AddMetersBatchOutput>> addMetersBulkFuture =
-                Futures.transform(commonResult, MeterUtil.METER_ADD_TRANSFORM);
+                Futures.transform(commonResult, MeterUtil.METER_ADD_TRANSFORM, MoreExecutors.directExecutor());
 
         if (input.isBarrierAfter()) {
             addMetersBulkFuture = BarrierUtil.chainBarrier(addMetersBulkFuture, input.getNode(),
@@ -137,23 +133,26 @@ public class SalMetersBatchServiceImpl implements SalMetersBatchService {
     }
 
     @Override
-    public Future<RpcResult<RemoveMetersBatchOutput>> removeMetersBatch(final RemoveMetersBatchInput input) {
-        LOG.trace("Removing meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchRemoveMeters().size());
+    public ListenableFuture<RpcResult<RemoveMetersBatchOutput>> removeMetersBatch(final RemoveMetersBatchInput input) {
+        LOG.trace("Removing meters @ {} : {}",
+                  PathUtil.extractNodeId(input.getNode()),
+                  input.getBatchRemoveMeters().size());
         final ArrayList<ListenableFuture<RpcResult<RemoveMeterOutput>>> resultsLot = new ArrayList<>();
-        for (BatchRemoveMeters addMeter : input.getBatchRemoveMeters()) {
+        for (BatchRemoveMeters addMeter : input.nonnullBatchRemoveMeters().values()) {
             final RemoveMeterInput removeMeterInput = new RemoveMeterInputBuilder(addMeter)
                     .setMeterRef(createMeterRef(input.getNode(), addMeter))
                     .setNode(input.getNode())
                     .build();
-            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.removeMeter(removeMeterInput)));
+            resultsLot.add(salMeterService.removeMeter(removeMeterInput));
         }
 
         final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
                 Futures.transform(Futures.allAsList(resultsLot),
-                        MeterUtil.<RemoveMeterOutput>createCumulativeFunction(input.getBatchRemoveMeters()));
+                        MeterUtil.createCumulativeFunction(input.nonnullBatchRemoveMeters().values()),
+                        MoreExecutors.directExecutor());
 
         ListenableFuture<RpcResult<RemoveMetersBatchOutput>> removeMetersBulkFuture =
-                Futures.transform(commonResult, MeterUtil.METER_REMOVE_TRANSFORM);
+                Futures.transform(commonResult, MeterUtil.METER_REMOVE_TRANSFORM, MoreExecutors.directExecutor());
 
         if (input.isBarrierAfter()) {
             removeMetersBulkFuture = BarrierUtil.chainBarrier(removeMetersBulkFuture, input.getNode(),