X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fservices%2Fsal%2FSalFlatBatchServiceImpl.java;h=bc1ee5943ac2da2932498c588e69969cc526efc4;hb=5ae6b8928b7ed3ce989e304c0c57350ca3f692b5;hp=3eca9a73ac52210c11875d0e2ef12188efd2f715;hpb=c373ae004e9e04a40ea9c3a7d7476fdf47faee09;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalFlatBatchServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalFlatBatchServiceImpl.java index 3eca9a73ac..bc1ee5943a 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalFlatBatchServiceImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/sal/SalFlatBatchServiceImpl.java @@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Future; @@ -53,7 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService} + * Default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}. */ public class SalFlatBatchServiceImpl implements SalFlatBatchService { private static final Logger LOG = LoggerFactory.getLogger(SalFlatBatchServiceImpl.class); @@ -66,19 +67,24 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService { final SalGroupsBatchService salGroupsBatchService, final SalMetersBatchService salMetersBatchService) { this.salFlowService = Preconditions.checkNotNull(salFlowBatchService, "delegate flow service must not be null"); - this.salGroupService = Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null"); - this.salMeterService = Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null"); + this.salGroupService = + Preconditions.checkNotNull(salGroupsBatchService, "delegate group service must not be null"); + this.salMeterService = + Preconditions.checkNotNull(salMetersBatchService, "delegate meter service must not be null"); } @Override public Future> processFlatBatch(final ProcessFlatBatchInput input) { - LOG.trace("processing flat batch @ {} : {}", PathUtil.extractNodeId(input.getNode()).getValue(), input.getBatch().size()); + LOG.trace("processing flat batch @ {} : {}", + PathUtil.extractNodeId(input.getNode()).getValue(), + input.getBatch().size()); // create plan final List batchPlan = FlatBatchUtil.assembleBatchPlan(input.getBatch()); // add barriers where needed FlatBatchUtil.markBarriersWhereNeeded(batchPlan); // prepare chain elements - final List batchChainElements = prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError()); + final List batchChainElements = + prepareBatchChain(batchPlan, input.getNode(), input.isExitOnFirstError()); // execute plan with barriers and collect outputs chain correspondingly, collect results return executeBatchPlan(batchChainElements); } @@ -93,9 +99,10 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService { for (int i = 0; i < batchJobsChain.size(); i++) { batchJob = batchJobsChain.get(i); // wire actual job with chain - firedJobs.add(Futures.transform(chainSummaryResult, batchJob.getStepFunction())); + firedJobs.add(Futures.transformAsync(chainSummaryResult, batchJob.getStepFunction(), + MoreExecutors.directExecutor())); // if barrier after actual job is needed or it is the last job -> merge fired job results with chain result - if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size()-1)) { + if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size() - 1)) { firedJobs.add(0, chainSummaryResult); chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs); firedJobs.clear(); @@ -105,7 +112,9 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService { } @VisibleForTesting - List prepareBatchChain(final List batchPlan, final NodeRef node, final boolean exitOnFirstError) { + List prepareBatchChain(final List batchPlan, + final NodeRef node, + final boolean exitOnFirstError) { // create batch API calls based on plan steps final List chainJobs = new ArrayList<>(); int stepOffset = 0; @@ -133,49 +142,75 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService { switch (planStep.getStepType()) { case FLOW_ADD: - final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node); - final Future> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput); + final AddFlowsBatchInput addFlowsBatchInput = + FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node); + final Future> resultAddFlowFuture = + salFlowService.addFlowsBatch(addFlowsBatchInput); chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset); break; case FLOW_REMOVE: - final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node); - final Future> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput); - chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset); + final RemoveFlowsBatchInput removeFlowsBatchInput = + FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node); + final Future> resultRemoveFlowFuture = + salFlowService.removeFlowsBatch(removeFlowsBatchInput); + chainOutput = + FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset); break; case FLOW_UPDATE: - final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node); - final Future> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput); - chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset); + final UpdateFlowsBatchInput updateFlowsBatchInput = + FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node); + final Future> resultUpdateFlowFuture = + salFlowService.updateFlowsBatch(updateFlowsBatchInput); + chainOutput = + FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset); break; case GROUP_ADD: - final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node); - final Future> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput); - chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset); + final AddGroupsBatchInput addGroupsBatchInput = + FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node); + final Future> resultAddGroupFuture = + salGroupService.addGroupsBatch(addGroupsBatchInput); + chainOutput = + FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset); break; case GROUP_REMOVE: - final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node); - final Future> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput); - chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset); + final RemoveGroupsBatchInput removeGroupsBatchInput = + FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node); + final Future> resultRemoveGroupFuture = + salGroupService.removeGroupsBatch(removeGroupsBatchInput); + chainOutput = + FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset); break; case GROUP_UPDATE: - final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node); - final Future> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput); - chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset); + final UpdateGroupsBatchInput updateGroupsBatchInput = + FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node); + final Future> resultUpdateGroupFuture = + salGroupService.updateGroupsBatch(updateGroupsBatchInput); + chainOutput = + FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset); break; case METER_ADD: - final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node); - final Future> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput); - chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset); + final AddMetersBatchInput addMetersBatchInput = + FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node); + final Future> resultAddMeterFuture = + salMeterService.addMetersBatch(addMetersBatchInput); + chainOutput = + FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset); break; case METER_REMOVE: - final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node); - final Future> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput); - chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset); + final RemoveMetersBatchInput removeMetersBatchInput = + FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node); + final Future> resultRemoveMeterFuture = + salMeterService.removeMetersBatch(removeMetersBatchInput); + chainOutput = + FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset); break; case METER_UPDATE: - final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node); - final Future> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput); - chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset); + final UpdateMetersBatchInput updateMetersBatchInput = + FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node); + final Future> resultUpdateMeterFuture = + salMeterService.updateMetersBatch(updateMetersBatchInput); + chainOutput = + FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset); break; default: LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());