package org.opendaylight.openflowplugin.impl.services.sal;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Future;
-import javax.annotation.Nullable;
+import java.util.stream.Collectors;
import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
import org.opendaylight.openflowplugin.impl.util.MeterUtil;
import org.opendaylight.openflowplugin.impl.util.PathUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.BatchMeterInputUpdateGrouping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
private final SalMeterService salMeterService;
private final FlowCapableTransactionService transactionService;
- public SalMetersBatchServiceImpl(final SalMeterService salMeterService, final FlowCapableTransactionService transactionService) {
+ public SalMetersBatchServiceImpl(final SalMeterService salMeterService,
+ final FlowCapableTransactionService transactionService) {
this.salMeterService = Preconditions.checkNotNull(salMeterService);
this.transactionService = Preconditions.checkNotNull(transactionService);
}
@Override
- public Future<RpcResult<UpdateMetersBatchOutput>> updateMetersBatch(final UpdateMetersBatchInput input) {
+ public ListenableFuture<RpcResult<UpdateMetersBatchOutput>> updateMetersBatch(final UpdateMetersBatchInput input) {
final List<BatchUpdateMeters> batchUpdateMeters = input.getBatchUpdateMeters();
LOG.trace("Updating meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), batchUpdateMeters.size());
.setMeterRef(createMeterRef(input.getNode(), batchMeter))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.updateMeter(updateMeterInput)));
+ resultsLot.add(salMeterService.updateMeter(updateMeterInput));
}
- final Iterable<Meter> meters = Iterables.transform(batchUpdateMeters, new Function<BatchUpdateMeters, Meter>() {
- @Nullable
- @Override
- public Meter apply(@Nullable final BatchUpdateMeters input) {
- return input.getUpdatedBatchedMeter();
- }
- }
- );
+ final Iterable<Meter> meters = batchUpdateMeters.stream()
+ .map(BatchMeterInputUpdateGrouping::getUpdatedBatchedMeter)
+ .collect(Collectors.toList());
final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
- Futures.transform(Futures.allAsList(resultsLot), MeterUtil.<UpdateMeterOutput>createCumulativeFunction(
- meters, batchUpdateMeters.size()));
+ Futures.transform(Futures.allAsList(resultsLot),
+ MeterUtil.createCumulativeFunction(meters, batchUpdateMeters.size()),
+ MoreExecutors.directExecutor());
ListenableFuture<RpcResult<UpdateMetersBatchOutput>> updateMetersBulkFuture =
- Futures.transform(commonResult, MeterUtil.METER_UPDATE_TRANSFORM);
+ Futures.transform(commonResult, MeterUtil.METER_UPDATE_TRANSFORM, MoreExecutors.directExecutor());
if (input.isBarrierAfter()) {
updateMetersBulkFuture = BarrierUtil.chainBarrier(updateMetersBulkFuture, input.getNode(),
}
@Override
- public Future<RpcResult<AddMetersBatchOutput>> addMetersBatch(final AddMetersBatchInput input) {
+ public ListenableFuture<RpcResult<AddMetersBatchOutput>> addMetersBatch(final AddMetersBatchInput input) {
LOG.trace("Adding meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddMeters().size());
final ArrayList<ListenableFuture<RpcResult<AddMeterOutput>>> resultsLot = new ArrayList<>();
- for (BatchAddMeters addMeter : input.getBatchAddMeters()) {
+ for (BatchAddMeters addMeter : input.nonnullBatchAddMeters().values()) {
final AddMeterInput addMeterInput = new AddMeterInputBuilder(addMeter)
.setMeterRef(createMeterRef(input.getNode(), addMeter))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.addMeter(addMeterInput)));
+ resultsLot.add(salMeterService.addMeter(addMeterInput));
}
final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
Futures.transform(Futures.allAsList(resultsLot),
- MeterUtil.<AddMeterOutput>createCumulativeFunction(input.getBatchAddMeters()));
+ MeterUtil.createCumulativeFunction(input.nonnullBatchAddMeters().values()),
+ MoreExecutors.directExecutor());
ListenableFuture<RpcResult<AddMetersBatchOutput>> addMetersBulkFuture =
- Futures.transform(commonResult, MeterUtil.METER_ADD_TRANSFORM);
+ Futures.transform(commonResult, MeterUtil.METER_ADD_TRANSFORM, MoreExecutors.directExecutor());
if (input.isBarrierAfter()) {
addMetersBulkFuture = BarrierUtil.chainBarrier(addMetersBulkFuture, input.getNode(),
}
@Override
- public Future<RpcResult<RemoveMetersBatchOutput>> removeMetersBatch(final RemoveMetersBatchInput input) {
- LOG.trace("Removing meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchRemoveMeters().size());
+ public ListenableFuture<RpcResult<RemoveMetersBatchOutput>> removeMetersBatch(final RemoveMetersBatchInput input) {
+ LOG.trace("Removing meters @ {} : {}",
+ PathUtil.extractNodeId(input.getNode()),
+ input.getBatchRemoveMeters().size());
final ArrayList<ListenableFuture<RpcResult<RemoveMeterOutput>>> resultsLot = new ArrayList<>();
- for (BatchRemoveMeters addMeter : input.getBatchRemoveMeters()) {
+ for (BatchRemoveMeters addMeter : input.nonnullBatchRemoveMeters().values()) {
final RemoveMeterInput removeMeterInput = new RemoveMeterInputBuilder(addMeter)
.setMeterRef(createMeterRef(input.getNode(), addMeter))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.removeMeter(removeMeterInput)));
+ resultsLot.add(salMeterService.removeMeter(removeMeterInput));
}
final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
Futures.transform(Futures.allAsList(resultsLot),
- MeterUtil.<RemoveMeterOutput>createCumulativeFunction(input.getBatchRemoveMeters()));
+ MeterUtil.createCumulativeFunction(input.nonnullBatchRemoveMeters().values()),
+ MoreExecutors.directExecutor());
ListenableFuture<RpcResult<RemoveMetersBatchOutput>> removeMetersBulkFuture =
- Futures.transform(commonResult, MeterUtil.METER_REMOVE_TRANSFORM);
+ Futures.transform(commonResult, MeterUtil.METER_REMOVE_TRANSFORM, MoreExecutors.directExecutor());
if (input.isBarrierAfter()) {
removeMetersBulkFuture = BarrierUtil.chainBarrier(removeMetersBulkFuture, input.getNode(),