import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Future;
import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
import org.opendaylight.openflowplugin.impl.services.batch.BatchStepJob;
import org.opendaylight.openflowplugin.impl.services.batch.FlatBatchFlowAdapters;
}
@Override
- public Future<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
+ public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> processFlatBatch(final ProcessFlatBatchInput input) {
LOG.trace("processing flat batch @ {} : {}",
PathUtil.extractNodeId(input.getNode()).getValue(),
input.getBatch().size());
// create plan
- final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch());
+ final List<BatchPlanStep> batchPlan = FlatBatchUtil.assembleBatchPlan(input.nonnullBatch().values());
// add barriers where needed
FlatBatchUtil.markBarriersWhereNeeded(batchPlan);
// prepare chain elements
}
@VisibleForTesting
- Future<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
+ ListenableFuture<RpcResult<ProcessFlatBatchOutput>> executeBatchPlan(final List<BatchStepJob> batchJobsChain) {
BatchStepJob batchJob;
final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs = new ArrayList<>();
ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainSummaryResult =
for (int i = 0; i < batchJobsChain.size(); i++) {
batchJob = batchJobsChain.get(i);
// wire actual job with chain
- firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction()));
+ firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(),
+ MoreExecutors.directExecutor()));
// if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
- if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size() - 1)) {
+ if (batchJob.getPlanStep().isBarrierAfter() || i == batchJobsChain.size() - 1) {
firedJobs.add(0, chainSummaryResult);
chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
firedJobs.clear();
case FLOW_ADD:
final AddFlowsBatchInput addFlowsBatchInput =
FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
- final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture =
+ final ListenableFuture<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture =
salFlowService.addFlowsBatch(addFlowsBatchInput);
chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
break;
case FLOW_REMOVE:
final RemoveFlowsBatchInput removeFlowsBatchInput =
FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
- final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture =
+ final ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture =
salFlowService.removeFlowsBatch(removeFlowsBatchInput);
chainOutput =
FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
case FLOW_UPDATE:
final UpdateFlowsBatchInput updateFlowsBatchInput =
FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
- final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture =
+ final ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture =
salFlowService.updateFlowsBatch(updateFlowsBatchInput);
chainOutput =
FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
case GROUP_ADD:
final AddGroupsBatchInput addGroupsBatchInput =
FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
- final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture =
+ final ListenableFuture<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture =
salGroupService.addGroupsBatch(addGroupsBatchInput);
chainOutput =
FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
case GROUP_REMOVE:
final RemoveGroupsBatchInput removeGroupsBatchInput =
FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
- final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture =
+ final ListenableFuture<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture =
salGroupService.removeGroupsBatch(removeGroupsBatchInput);
chainOutput =
FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
case GROUP_UPDATE:
final UpdateGroupsBatchInput updateGroupsBatchInput =
FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
- final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture =
+ final ListenableFuture<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture =
salGroupService.updateGroupsBatch(updateGroupsBatchInput);
chainOutput =
FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
case METER_ADD:
final AddMetersBatchInput addMetersBatchInput =
FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
- final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture =
+ final ListenableFuture<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture =
salMeterService.addMetersBatch(addMetersBatchInput);
chainOutput =
FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
case METER_REMOVE:
final RemoveMetersBatchInput removeMetersBatchInput =
FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
- final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture =
+ final ListenableFuture<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture =
salMeterService.removeMetersBatch(removeMetersBatchInput);
chainOutput =
FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
case METER_UPDATE:
final UpdateMetersBatchInput updateMetersBatchInput =
FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
- final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture =
+ final ListenableFuture<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture =
salMeterService.updateMetersBatch(updateMetersBatchInput);
chainOutput =
FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);