import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
// if barrier after actual job is needed or it is the last job -> merge fired job results with chain result
if ((batchJob.getPlanStep().isBarrierAfter()) || (i == batchJobsChain.size()-1)) {
firedJobs.add(0, chainSummaryResult);
- chainSummaryResult = mergeJobsResultsFutures(firedJobs);
+ chainSummaryResult = FlatBatchUtil.mergeJobsResultsFutures(firedJobs);
firedJobs.clear();
}
}
return chainSummaryResult;
}
- private static ListenableFuture<RpcResult<ProcessFlatBatchOutput>> mergeJobsResultsFutures(
- final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs) {
-
- ListenableFuture<List<RpcResult<ProcessFlatBatchOutput>>> jobs = Futures.successfulAsList(firedJobs);
-
- return Futures.transform(jobs, new AsyncFunction<List<RpcResult<ProcessFlatBatchOutput>>, RpcResult<ProcessFlatBatchOutput>>() {
- @Override
- public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(List<RpcResult<ProcessFlatBatchOutput>> jobsResults) {
- boolean isSuccessful = true;
- List<RpcError> rpcErrors = new ArrayList<>();
- List<BatchFailure> batchFailures = new ArrayList<>();
-
- for (RpcResult<ProcessFlatBatchOutput> jobResult : jobsResults) {
- if (jobResult != null) {
- isSuccessful = (isSuccessful && jobResult.isSuccessful());
- rpcErrors.addAll(jobResult.getErrors());
- batchFailures.addAll(jobResult.getResult().getBatchFailure());
- }
- }
-
- return RpcResultBuilder.<ProcessFlatBatchOutput>status(isSuccessful)
- .withRpcErrors(rpcErrors)
- .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(batchFailures).build())
- .buildFuture();
- }
- });
- }
-
@VisibleForTesting
List<BatchStepJob> prepareBatchChain(final List<BatchPlanStep> batchPlan, final NodeRef node, final boolean exitOnFirstError) {
// create batch API calls based on plan steps
int stepOffset = 0;
for (final BatchPlanStep planStep : batchPlan) {
final int currentOffset = stepOffset;
- chainJobs.add(new BatchStepJob(planStep, new AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>>() {
- @Override
- public ListenableFuture<RpcResult<ProcessFlatBatchOutput>> apply(final RpcResult<ProcessFlatBatchOutput> chainInput) {
- if (exitOnFirstError && !chainInput.isSuccessful()) {
- LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
- return RpcResultBuilder.<ProcessFlatBatchOutput>status(false)
- .withRpcErrors(new ArrayList<>())
- .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
- .buildFuture();
- }
-
- LOG.trace("batch progressing on step type {}", planStep.getStepType());
- LOG.trace("batch progressing previous steps result: {}", chainInput.isSuccessful());
+ chainJobs.add(new BatchStepJob(planStep, chainInput -> {
+ if (exitOnFirstError && !chainInput.isSuccessful()) {
+ LOG.debug("error on flat batch chain occurred -> skipping step {}", planStep.getStepType());
+ return RpcResultBuilder.<ProcessFlatBatchOutput>status(false)
+ .withRpcErrors(new ArrayList<>())
+ .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
+ .buildFuture();
+ }
- final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
- switch (planStep.getStepType()) {
- case FLOW_ADD:
- final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
- final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
- chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
- break;
- case FLOW_REMOVE:
- final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
- final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
- chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
- break;
- case FLOW_UPDATE:
- final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
- final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
- chainOutput = FlatBatchFlowAdapters.adaptFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
- break;
- case GROUP_ADD:
- final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
- final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
- chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
- break;
- case GROUP_REMOVE:
- final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
- final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
- chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
- break;
- case GROUP_UPDATE:
- final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
- final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
- chainOutput = FlatBatchGroupAdapters.adaptGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
- break;
- case METER_ADD:
- final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
- final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
- chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
- break;
- case METER_REMOVE:
- final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
- final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
- chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
- break;
- case METER_UPDATE:
- final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
- final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
- chainOutput = FlatBatchMeterAdapters.adaptMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
- break;
- default:
- LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
- chainOutput = RpcResultBuilder.<ProcessFlatBatchOutput>status(true)
- .withRpcErrors(new ArrayList<>())
- .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
- .buildFuture();
- }
- return chainOutput;
+ LOG.trace("batch progressing on step type {}", planStep.getStepType());
+ LOG.trace("batch progressing previous steps result: {}", chainInput.isSuccessful());
+
+ final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
+ switch (planStep.getStepType()) {
+ case FLOW_ADD:
+ final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
+ final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
+ chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
+ break;
+ case FLOW_REMOVE:
+ final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
+ final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
+ chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
+ break;
+ case FLOW_UPDATE:
+ final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
+ final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
+ chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
+ break;
+ case GROUP_ADD:
+ final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
+ final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
+ chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
+ break;
+ case GROUP_REMOVE:
+ final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
+ final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
+ chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
+ break;
+ case GROUP_UPDATE:
+ final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
+ final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
+ chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
+ break;
+ case METER_ADD:
+ final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
+ final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
+ chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
+ break;
+ case METER_REMOVE:
+ final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
+ final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
+ chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
+ break;
+ case METER_UPDATE:
+ final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
+ final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
+ chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
+ break;
+ default:
+ LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
+ chainOutput = RpcResultBuilder.<ProcessFlatBatchOutput>status(true)
+ .withRpcErrors(new ArrayList<>())
+ .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>()).build())
+ .buildFuture();
}
+ return chainOutput;
}));
stepOffset += planStep.getTaskBag().size();
}
import org.opendaylight.yangtools.yang.common.RpcResult;
/**
- * Wrapper for batch step jobs.
+ * Wrapper for batch step jobs (type + function).
*/
public class BatchStepJob {
private final BatchPlanStep planStep;
private final AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> stepFunction;
- public BatchStepJob(final BatchPlanStep planStep, final AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> stepFunction) {
+ public BatchStepJob(final BatchPlanStep planStep,
+ final AsyncFunction<RpcResult<ProcessFlatBatchOutput>, RpcResult<ProcessFlatBatchOutput>> stepFunction) {
this.planStep = planStep;
this.stepFunction = stepFunction;
}
/**
* @param stepOffset offset of current batch plan step
- * @return next chained result incorporating results of this step's batch
+ * @return converted {@link ProcessFlatBatchOutput} RPC result
*/
@VisibleForTesting
static <T extends BatchFlowOutputListGrouping> Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>
- createBatchFlowChainingFunction(final int stepOffset) {
+ convertBatchFlowResult(final int stepOffset) {
return new Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>() {
@Nullable
@Override
}
/**
- * shortcut for {@link #createBatchFlowChainingFunction(int)} with conversion {@link ListenableFuture}
+ * shortcut for {@link #convertBatchFlowResult(int)} with conversion {@link ListenableFuture}
*
* @param <T> exact type of batch flow output
* @param resultUpdateFlowFuture batch flow rpc-result (add/remove/update)
* @param currentOffset offset of current batch plan step with respect to entire chain of steps
- * @return next chained result incorporating results of this step's batch
+ * @return ListenableFuture with converted result {@link ProcessFlatBatchOutput}
*/
public static <T extends BatchFlowOutputListGrouping> ListenableFuture<RpcResult<ProcessFlatBatchOutput>>
- adaptFlowBatchFutureForChain(final Future<RpcResult<T>> resultUpdateFlowFuture,
- final int currentOffset) {
+ convertFlowBatchFutureForChain(final Future<RpcResult<T>> resultUpdateFlowFuture,
+ final int currentOffset) {
return Futures.transform(JdkFutureAdapters.listenInPoolThread(resultUpdateFlowFuture),
- FlatBatchFlowAdapters.<T>createBatchFlowChainingFunction(currentOffset));
+ FlatBatchFlowAdapters.<T>convertBatchFlowResult(currentOffset));
}
}
/**
* @param stepOffset offset of current batch plan step
- * @return next chained result incorporating results of this step's batch
+ * @return converted {@link ProcessFlatBatchOutput} RPC result
*/
@VisibleForTesting
static <T extends BatchGroupOutputListGrouping> Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>
- createBatchGroupChainingFunction(final int stepOffset) {
+ convertBatchGroupResult(final int stepOffset) {
return new Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>() {
@Nullable
@Override
}
/**
- * shortcut for {@link #createBatchGroupChainingFunction(int)} with conversion {@link ListenableFuture}
+ * shortcut for {@link #convertBatchGroupResult(int)} with conversion {@link ListenableFuture}
*
* @param <T> exact type of batch flow output
* @param resultUpdateGroupFuture batch group rpc-result (add/remove/update)
* @param currentOffset offset of current batch plan step with respect to entire chain of steps
- * @return next chained result incorporating results of this step's batch
+ * @return ListenableFuture with converted result {@link ProcessFlatBatchOutput}
*/
public static <T extends BatchGroupOutputListGrouping> ListenableFuture<RpcResult<ProcessFlatBatchOutput>>
- adaptGroupBatchFutureForChain(final Future<RpcResult<T>> resultUpdateGroupFuture,
- final int currentOffset) {
+ convertGroupBatchFutureForChain(final Future<RpcResult<T>> resultUpdateGroupFuture,
+ final int currentOffset) {
return Futures.transform(JdkFutureAdapters.listenInPoolThread(resultUpdateGroupFuture),
- FlatBatchGroupAdapters.<T>createBatchGroupChainingFunction(currentOffset));
+ FlatBatchGroupAdapters.<T>convertBatchGroupResult(currentOffset));
}
}
/**
* @param stepOffset offset of current batch plan step
- * @return next chained result incorporating results of this step's batch
+ * @return converted {@link ProcessFlatBatchOutput} RPC result
*/
@VisibleForTesting
static <T extends BatchMeterOutputListGrouping> Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>
- createBatchMeterChainingFunction(final int stepOffset) {
+ convertBatchMeterResult(final int stepOffset) {
return new Function<RpcResult<T>, RpcResult<ProcessFlatBatchOutput>>() {
@Nullable
@Override
}
/**
- * shortcut for {@link #createBatchMeterChainingFunction(int)} with conversion {@link ListenableFuture}
+ * shortcut for {@link #convertBatchMeterResult(int)} with conversion {@link ListenableFuture}
*
* @param <T> exact type of batch flow output
* @param resultUpdateMeterFuture batch group rpc-result (add/remove/update)
* @param currentOffset offset of current batch plan step with respect to entire chain of steps
- * @return next chained result incorporating results of this step's batch
+ * @return ListenableFuture with converted result {@link ProcessFlatBatchOutput}
*/
public static <T extends BatchMeterOutputListGrouping> ListenableFuture<RpcResult<ProcessFlatBatchOutput>>
- adaptMeterBatchFutureForChain(final Future<RpcResult<T>> resultUpdateMeterFuture,
- final int currentOffset) {
+ convertMeterBatchFutureForChain(final Future<RpcResult<T>> resultUpdateMeterFuture,
+ final int currentOffset) {
return Futures.transform(JdkFutureAdapters.listenInPoolThread(resultUpdateMeterFuture),
- FlatBatchMeterAdapters.<T>createBatchMeterChainingFunction(currentOffset));
+ FlatBatchMeterAdapters.<T>convertBatchMeterResult(currentOffset));
}
}
package org.opendaylight.openflowplugin.impl.util;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
import org.opendaylight.openflowplugin.impl.services.batch.BatchStepType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchAddFlowCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateFlowCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateGroupCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.FlatBatchUpdateMeterCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
import org.opendaylight.yang.gen.v1.urn.opendaylight.service.batch.common.rev160322.BatchOrderGrouping;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.common.RpcError;
}
/**
- * join errors of left and right rpc result into output
- *
- * @param output target result
- * @param chainInput left part (chained rpc result)
- * @param input right part (result of current operation)
- * @param <L> chain type
- * @param <R> current operation type
+ * @return RPC result incorporating partial results (state, errors, batch failures)
*/
- private static <L, R> void joinErrors(final RpcResultBuilder<L> output, final RpcResult<L> chainInput, final RpcResult<R> input) {
- final Collection<RpcError> rpcErrors = new ArrayList<>(chainInput.getErrors());
- rpcErrors.addAll(input.getErrors());
- if (!rpcErrors.isEmpty()) {
- output.withRpcErrors(rpcErrors);
- }
- }
+ @VisibleForTesting
+ static Function<List<RpcResult<ProcessFlatBatchOutput>>, RpcResult<ProcessFlatBatchOutput>> mergeRpcResults() {
+ return jobsResults -> {
+ boolean isSuccessful = true;
+ List<RpcError> rpcErrors = new ArrayList<>();
+ List<BatchFailure> batchFailures = new ArrayList<>();
+
+ for (RpcResult<ProcessFlatBatchOutput> jobResult : jobsResults) {
+ if (jobResult != null) {
+ isSuccessful = (isSuccessful && jobResult.isSuccessful());
+ rpcErrors.addAll(jobResult.getErrors());
+ batchFailures.addAll(jobResult.getResult().getBatchFailure());
+ }
+ }
- /**
- * create rpc result honoring success/fail outcomes of arguments
- *
- * @param chainInput left part (chained rpc result)
- * @param input right part (results of current operation)
- * @param <L> chain type
- * @param <R> current operation type
- * @return rpc result with combined status
- */
- private static <L, R> RpcResultBuilder<L> createNextRpcResultBuilder(final RpcResult<L> chainInput, final RpcResult<R> input) {
- return RpcResultBuilder.<L>status(input.isSuccessful() && chainInput.isSuccessful());
+ return RpcResultBuilder.<ProcessFlatBatchOutput>status(isSuccessful)
+ .withRpcErrors(rpcErrors)
+ .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(batchFailures).build())
+ .build();
+ };
}
/**
- * Create rpc result builder with combined status and sum of all errors.
- * <br>
- * Shortcut for {@link #createNextRpcResultBuilder(RpcResult, RpcResult)} and
- * {@link #joinErrors(RpcResultBuilder, RpcResult, RpcResult)}.
- *
- * @param chainInput left part (chained rpc result)
- * @param input right part (results of current operation)
- * @param <L> chain type
- * @param <R> current operation type
- * @return rpc result with combined status and all errors
+ * Merge list of Futures with partial results into one ListenableFuture with single result.
+ * shortcut for {@link #mergeRpcResults()}
+ * @param firedJobs list of ListenableFutures with RPC results {@link ProcessFlatBatchOutput}
+ * @return ListenableFuture of RPC result with combined status and all errors + batch failures
*/
- public static <L, R> RpcResultBuilder<L> mergeRpcResults(final RpcResult<L> chainInput, final RpcResult<R> input) {
- // create rpcResult builder honoring both success/failure of current input and chained input
- final RpcResultBuilder<L> output = FlatBatchUtil.createNextRpcResultBuilder(chainInput, input);
- // join errors
- FlatBatchUtil.joinErrors(output, chainInput, input);
- return output;
+ public static ListenableFuture<RpcResult<ProcessFlatBatchOutput>> mergeJobsResultsFutures(
+ final List<ListenableFuture<RpcResult<ProcessFlatBatchOutput>>> firedJobs) {
+ return Futures.transform(Futures.successfulAsList(firedJobs), mergeRpcResults());
}
+
+
}
.build();
final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchFlowAdapters
- .createBatchFlowChainingFunction(3).apply(input);
+ .convertBatchFlowResult(3).apply(input);
Assert.assertFalse(rpcResult.isSuccessful());
Assert.assertEquals(1, rpcResult.getErrors().size());
.build();
final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchFlowAdapters
- .createBatchFlowChainingFunction(0).apply(input);
+ .convertBatchFlowResult(0).apply(input);
Assert.assertTrue(rpcResult.isSuccessful());
Assert.assertEquals(0, rpcResult.getErrors().size());
.build();
final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchGroupAdapters
- .createBatchGroupChainingFunction(3).apply(input);
+ .convertBatchGroupResult(3).apply(input);
Assert.assertFalse(rpcResult.isSuccessful());
Assert.assertEquals(1, rpcResult.getErrors().size());
.build();
final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchGroupAdapters
- .createBatchGroupChainingFunction(0).apply(input);
+ .convertBatchGroupResult(0).apply(input);
Assert.assertTrue(rpcResult.isSuccessful());
Assert.assertEquals(0, rpcResult.getErrors().size());
.build();
final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchMeterAdapters
- .createBatchMeterChainingFunction(3).apply(input);
+ .convertBatchMeterResult(3).apply(input);
Assert.assertFalse(rpcResult.isSuccessful());
Assert.assertEquals(1, rpcResult.getErrors().size());
.build();
final RpcResult<ProcessFlatBatchOutput> rpcResult = FlatBatchMeterAdapters
- .createBatchMeterChainingFunction(0).apply(input);
+ .convertBatchMeterResult(0).apply(input);
Assert.assertTrue(rpcResult.isSuccessful());
Assert.assertEquals(0, rpcResult.getErrors().size());
import org.junit.Test;
import org.opendaylight.openflowplugin.impl.services.batch.BatchPlanStep;
import org.opendaylight.openflowplugin.impl.services.batch.BatchStepType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.ProcessFlatBatchOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.Batch;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.BatchBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.BatchChoice;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.flow._case.FlatBatchUpdateFlowBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.group._case.FlatBatchUpdateGroupBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailureBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.batch.failure.batch.item.id.choice.FlatBatchFailureFlowIdCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
}
@Test
- public void testMergeRpcResults() throws Exception {
- final RpcResult<String> rpcResultFailed = RpcResultBuilder.<String>failed()
- .withError(RpcError.ErrorType.APPLICATION, "ut-rpcError").build();
- final RpcResult<String> rpcResultSuccess = RpcResultBuilder.<String>success().build();
+ public void testMergeJobsResultsFutures() throws Exception {
+ final BatchFailure batchFailure = new BatchFailureBuilder()
+ .setBatchOrder(9)
+ .setBatchItemIdChoice(new FlatBatchFailureFlowIdCaseBuilder()
+ .setFlowId(new FlowId("11"))
+ .build())
+ .build();
+
+ final ProcessFlatBatchOutput output = new ProcessFlatBatchOutputBuilder().setBatchFailure(Lists.newArrayList(batchFailure)).build();
+
+ final RpcResult<ProcessFlatBatchOutput> rpcResultFailed = RpcResultBuilder.<ProcessFlatBatchOutput>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "ut-rpcError")
+ .withResult(output).build();
+ final RpcResult<ProcessFlatBatchOutput> rpcResultSuccess = RpcResultBuilder.<ProcessFlatBatchOutput>success()
+ .withResult(new ProcessFlatBatchOutputBuilder().setBatchFailure(new ArrayList<>())).build();
- final RpcResult<String> rpcResult1 = FlatBatchUtil.mergeRpcResults(rpcResultFailed, rpcResultSuccess).build();
+ final RpcResult<ProcessFlatBatchOutput> rpcResult1 = FlatBatchUtil.mergeRpcResults().apply(Lists.newArrayList(rpcResultFailed, rpcResultSuccess));
Assert.assertEquals(1, rpcResult1.getErrors().size());
Assert.assertFalse(rpcResult1.isSuccessful());
- final RpcResult<String> rpcResult2 = FlatBatchUtil.mergeRpcResults(rpcResultFailed, rpcResultFailed).build();
+ final RpcResult<ProcessFlatBatchOutput> rpcResult2 = FlatBatchUtil.mergeRpcResults().apply(Lists.newArrayList(rpcResultFailed, rpcResultFailed));
Assert.assertEquals(2, rpcResult2.getErrors().size());
Assert.assertFalse(rpcResult2.isSuccessful());
- final RpcResult<String> rpcResult3 = FlatBatchUtil.mergeRpcResults(rpcResultSuccess, rpcResultSuccess).build();
+ final RpcResult<ProcessFlatBatchOutput> rpcResult3 = FlatBatchUtil.mergeRpcResults().apply(Lists.newArrayList(rpcResultSuccess, rpcResultSuccess));
Assert.assertEquals(0, rpcResult3.getErrors().size());
Assert.assertTrue(rpcResult3.isSuccessful());
}