import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
import org.opendaylight.openflowplugin.impl.util.FlowUtil;
import org.opendaylight.openflowplugin.impl.util.PathUtil;
public SalFlowsBatchServiceImpl(final SalFlowService salFlowService,
final FlowCapableTransactionService transactionService) {
this.salFlowService = Preconditions.checkNotNull(salFlowService, "delegate flow service must not be null");
- this.transactionService = Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
+ this.transactionService =
+ Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
}
@Override
- public Future<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
- LOG.trace("Removing flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchRemoveFlows().size());
+ public ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
+ LOG.trace("Removing flows @ {} : {}",
+ PathUtil.extractNodeId(input.getNode()),
+ input.getBatchRemoveFlows().size());
final ArrayList<ListenableFuture<RpcResult<RemoveFlowOutput>>> resultsLot = new ArrayList<>();
- for (BatchFlowInputGrouping batchFlow : input.getBatchRemoveFlows()) {
+ for (BatchFlowInputGrouping batchFlow : input.nonnullBatchRemoveFlows().values()) {
final RemoveFlowInput removeFlowInput = new RemoveFlowInputBuilder(batchFlow)
.setFlowRef(createFlowRef(input.getNode(), batchFlow))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.removeFlow(removeFlowInput)));
+ resultsLot.add(salFlowService.removeFlow(removeFlowInput));
}
final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
Futures.transform(Futures.successfulAsList(resultsLot),
- FlowUtil.<RemoveFlowOutput>createCumulatingFunction(input.getBatchRemoveFlows()));
+ FlowUtil.createCumulatingFunction(input.nonnullBatchRemoveFlows().values()),
+ MoreExecutors.directExecutor());
- ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture = Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM);
+ ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture =
+ Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM, MoreExecutors.directExecutor());
if (input.isBarrierAfter()) {
removeFlowsBulkFuture = BarrierUtil.chainBarrier(removeFlowsBulkFuture, input.getNode(),
}
@Override
- public Future<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
+ public ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
LOG.trace("Adding flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddFlows().size());
final ArrayList<ListenableFuture<RpcResult<AddFlowOutput>>> resultsLot = new ArrayList<>();
- for (BatchFlowInputGrouping batchFlow : input.getBatchAddFlows()) {
+ for (BatchFlowInputGrouping batchFlow : input.nonnullBatchAddFlows().values()) {
final AddFlowInput addFlowInput = new AddFlowInputBuilder(batchFlow)
.setFlowRef(createFlowRef(input.getNode(), batchFlow))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.addFlow(addFlowInput)));
+ resultsLot.add(salFlowService.addFlow(addFlowInput));
}
final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
Futures.transform(Futures.successfulAsList(resultsLot),
- FlowUtil.<AddFlowOutput>createCumulatingFunction(input.getBatchAddFlows()));
+ FlowUtil.createCumulatingFunction(input.nonnullBatchAddFlows().values()),
+ MoreExecutors.directExecutor());
ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBulkFuture =
- Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM);
+ Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM, MoreExecutors.directExecutor());
if (input.isBarrierAfter()) {
addFlowsBulkFuture = BarrierUtil.chainBarrier(addFlowsBulkFuture, input.getNode(),
}
@Override
- public Future<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
- LOG.trace("Updating flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchUpdateFlows().size());
+ public ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
+ LOG.trace("Updating flows @ {} : {}",
+ PathUtil.extractNodeId(input.getNode()),
+ input.getBatchUpdateFlows().size());
final ArrayList<ListenableFuture<RpcResult<UpdateFlowOutput>>> resultsLot = new ArrayList<>();
- for (BatchUpdateFlows batchFlow : input.getBatchUpdateFlows()) {
+ for (BatchUpdateFlows batchFlow : input.nonnullBatchUpdateFlows().values()) {
final UpdateFlowInput updateFlowInput = new UpdateFlowInputBuilder(input)
.setOriginalFlow(new OriginalFlowBuilder(batchFlow.getOriginalBatchedFlow()).build())
.setUpdatedFlow(new UpdatedFlowBuilder(batchFlow.getUpdatedBatchedFlow()).build())
.setFlowRef(createFlowRef(input.getNode(), batchFlow))
.setNode(input.getNode())
.build();
- resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.updateFlow(updateFlowInput)));
+ resultsLot.add(salFlowService.updateFlow(updateFlowInput));
}
final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
- Futures.transform(Futures.successfulAsList(resultsLot), FlowUtil.<UpdateFlowOutput>createCumulatingFunction(input.getBatchUpdateFlows()));
+ Futures.transform(Futures.successfulAsList(resultsLot),
+ FlowUtil.createCumulatingFunction(input.nonnullBatchUpdateFlows().values()),
+ MoreExecutors.directExecutor());
- ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture = Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM);
+ ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture =
+ Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM, MoreExecutors.directExecutor());
if (input.isBarrierAfter()) {
updateFlowsBulkFuture = BarrierUtil.chainBarrier(updateFlowsBulkFuture, input.getNode(),
return updateFlowsBulkFuture;
}
-
}