*/
package org.opendaylight.openflowplugin.impl.util;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierOutput;
* @param <T> type of input future
* @param input future to chain barrier to
* @param nodeRef target device
- * @param transactionService barrier service
+ * @param sendBarrier barrier service
* @param compositeTransform composite transform
* @return future holding both results (input and of the barrier)
*/
- public static <T> ListenableFuture<RpcResult<T>> chainBarrier(
- final ListenableFuture<RpcResult<T>> input, final NodeRef nodeRef,
- final FlowCapableTransactionService transactionService,
+ public static <T> ListenableFuture<RpcResult<T>> chainBarrier(final ListenableFuture<RpcResult<T>> input,
+ final NodeRef nodeRef, final SendBarrier sendBarrier,
final Function<Pair<RpcResult<T>, RpcResult<SendBarrierOutput>>, RpcResult<T>> compositeTransform) {
- final MutablePair<RpcResult<T>, RpcResult<SendBarrierOutput>> resultPair = new MutablePair<>();
+ final var resultPair = new MutablePair<RpcResult<T>, RpcResult<SendBarrierOutput>>();
// store input result and append barrier
- final ListenableFuture<RpcResult<SendBarrierOutput>> barrierResult = Futures.transformAsync(input,
- interInput -> {
- resultPair.setLeft(interInput);
- final SendBarrierInput barrierInput = createSendBarrierInput(nodeRef);
- return transactionService.sendBarrier(barrierInput);
- }, MoreExecutors.directExecutor());
+ final var barrierResult = Futures.transformAsync(input, interInput -> {
+ resultPair.setLeft(interInput);
+ return sendBarrier.invoke(createSendBarrierInput(nodeRef));
+ }, MoreExecutors.directExecutor());
// store barrier result and return initiated pair
- final ListenableFuture<Pair<RpcResult<T>, RpcResult<SendBarrierOutput>>> compositeResult = Futures.transform(
- barrierResult,
- input1 -> {
- resultPair.setRight(input1);
- return resultPair;
- }, MoreExecutors.directExecutor());
+ final var compositeResult = Futures.transform(barrierResult, input1 -> {
+ resultPair.setRight(input1);
+ return resultPair;
+ }, MoreExecutors.directExecutor());
// append assembling transform to barrier result
return Futures.transform(compositeResult, compositeTransform, MoreExecutors.directExecutor());
}
* @param nodeRef rpc routing context
* @return input for {@link FlowCapableTransactionService#sendBarrier(SendBarrierInput)}
*/
- public static SendBarrierInput createSendBarrierInput(final NodeRef nodeRef) {
- return new SendBarrierInputBuilder()
- .setNode(nodeRef)
- .build();
+ @VisibleForTesting
+ static SendBarrierInput createSendBarrierInput(final NodeRef nodeRef) {
+ return new SendBarrierInputBuilder().setNode(nodeRef).build();
}
}