BUG-5574: sal-flows-batch proposal 07/36307/5
authorMichal Rehak <mirehak@cisco.com>
Mon, 14 Mar 2016 15:44:21 +0000 (16:44 +0100)
committerandrej.leitner <anleitne@cisco.com>
Tue, 19 Apr 2016 09:04:26 +0000 (11:04 +0200)
    - added common model for attaching barrier
    - added bulk API implementation for flows
      - with parametrized barrier-after support
    - added tests

Change-Id: I4e941d3654dc142e1d1d64a2258e629016e95a0e
Signed-off-by: Michal Rehak <mirehak@cisco.com>
(cherry picked from commit 79e8e834f0cbbccfbff99e51e4ddd854dd812d19)
Signed-off-by: andrej.leitner <anleitne@cisco.com>
model/model-flow-service/src/main/yang/barrier-common.yang [new file with mode: 0644]
model/model-flow-service/src/main/yang/batch-common.yang [new file with mode: 0644]
model/model-flow-service/src/main/yang/sal-flows-batch.yang [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowsBatchServiceImpl.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/BarrierUtil.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/FlowUtil.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/PathUtil.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalFlowsBatchServiceImplTest.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/BarrierUtilTest.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/FlowUtilTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/PathUtilTest.java [new file with mode: 0644]

diff --git a/model/model-flow-service/src/main/yang/barrier-common.yang b/model/model-flow-service/src/main/yang/barrier-common.yang
new file mode 100644 (file)
index 0000000..168776e
--- /dev/null
@@ -0,0 +1,18 @@
+module barrier-common {
+    namespace "urn:opendaylight:service:barrier:common";
+    prefix barrier-common;
+
+    description "Openflow barrier for services - common groupings.";
+
+    revision "2016-03-15" {
+        description "Initial revision of batch common groupings.";
+    }
+
+    grouping barrier-suffix {
+        description "Flag indicating that barrier will be attached after some service-specific action.";
+
+        leaf barrier-after {
+            type boolean;
+        }
+    }
+}
diff --git a/model/model-flow-service/src/main/yang/batch-common.yang b/model/model-flow-service/src/main/yang/batch-common.yang
new file mode 100644 (file)
index 0000000..feb2082
--- /dev/null
@@ -0,0 +1,17 @@
+module batch-common {
+    namespace "urn:opendaylight:service:batch:common";
+    prefix batch-common;
+
+    description "Openflow batch services - common groupings.";
+
+    revision "2016-03-22" {
+        description "Initial revision of batch common groupings.";
+    }
+
+    grouping batch-order-grouping {
+        description "provide unified batch order value";
+        leaf batch-order {
+            type uint16;
+        }
+    }
+}
diff --git a/model/model-flow-service/src/main/yang/sal-flows-batch.yang b/model/model-flow-service/src/main/yang/sal-flows-batch.yang
new file mode 100644 (file)
index 0000000..b7e5555
--- /dev/null
@@ -0,0 +1,103 @@
+module sal-flows-batch {
+    namespace "urn:opendaylight:flows:service";
+    prefix flows;
+
+    import barrier-common {prefix bc;revision-date "2016-03-15";}
+    import batch-common {prefix batch;revision-date "2016-03-22";}
+    import opendaylight-inventory {prefix inv;revision-date "2013-08-19";}
+    import opendaylight-flow-types {prefix types;revision-date "2013-10-26";}
+    import flow-node-inventory {prefix flow-inv; revision-date "2013-08-19";}
+
+    description "Openflow batch flow management.";
+
+    revision "2016-03-14" {
+        description "Initial revision of batch flow service";
+    }
+
+    grouping batch-flow-id-grouping {
+        description "General flow-id leaf.";
+
+        leaf flow-id {
+            type flow-inv:flow-id;
+        }
+    }
+
+    grouping batch-flow-input-grouping {
+        description "Openflow flow structure suitable for batch rpc input.";
+
+        uses batch-flow-id-grouping;
+        uses types:flow;
+    }
+
+    grouping batch-flow-input-update-grouping {
+        description "Openflow flow structure suitable for batch rpc input.";
+
+        uses batch-flow-id-grouping;
+        container original-batched-flow {
+            uses types:flow;
+        }
+        container updated-batched-flow {
+            uses types:flow;
+        }
+    }
+
+    grouping batch-flow-output-list-grouping {
+        description "Openflow flow list suitable for batch rpc output.";
+
+         list batch-failed-flows-output {
+            key batch-order;
+
+            uses batch:batch-order-grouping;
+            uses batch-flow-id-grouping;
+         }
+    }
+
+
+    rpc add-flows-batch {
+        description "Batch adding flows to openflow device.";
+        input {
+            uses "inv:node-context-ref";
+
+            list batch-add-flows {
+                key flow-id;
+                uses batch-flow-input-grouping;
+            }
+            uses bc:barrier-suffix;
+        }
+        output {
+            uses batch-flow-output-list-grouping;
+        }
+    }
+
+    rpc remove-flows-batch {
+        description "Batch removing flows from openflow device.";
+        input {
+            uses "inv:node-context-ref";
+
+            list batch-remove-flows {
+                key flow-id;
+                uses batch-flow-input-grouping;
+            }
+            uses bc:barrier-suffix;
+        }
+        output {
+            uses batch-flow-output-list-grouping;
+        }
+    }
+
+    rpc update-flows-batch {
+        description "Batch updating flows on openflow device.";
+        input {
+            uses "inv:node-context-ref";
+
+            list batch-update-flows {
+                key flow-id;
+                uses batch-flow-input-update-grouping;
+            }
+            uses bc:barrier-suffix;
+        }
+        output {
+            uses batch-flow-output-list-grouping;
+        }
+    }
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowsBatchServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowsBatchServiceImpl.java
new file mode 100644 (file)
index 0000000..635d86e
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.services;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
+import org.opendaylight.openflowplugin.impl.util.FlowUtil;
+import org.opendaylight.openflowplugin.impl.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowInputGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowInputUpdateGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.SalFlowsBatchService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.update.flows.batch.input.BatchUpdateFlows;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * default implementation of {@link SalFlowsBatchService} - delegates work to {@link SalFlowService}
+ */
+public class SalFlowsBatchServiceImpl implements SalFlowsBatchService {
+    private static final Logger LOG = LoggerFactory.getLogger(SalFlowsBatchServiceImpl.class);
+
+    private final SalFlowService salFlowService;
+    private final FlowCapableTransactionService transactionService;
+
+    public SalFlowsBatchServiceImpl(final SalFlowService salFlowService,
+                                    final FlowCapableTransactionService transactionService) {
+        this.salFlowService = Preconditions.checkNotNull(salFlowService, "delegate flow service must not be null");
+        this.transactionService = Preconditions.checkNotNull(transactionService, "delegate transaction service must not be null");
+    }
+
+    @Override
+    public Future<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBatch(final RemoveFlowsBatchInput input) {
+        LOG.trace("Removing flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchRemoveFlows().size());
+        final ArrayList<ListenableFuture<RpcResult<RemoveFlowOutput>>> resultsLot = new ArrayList<>();
+        for (BatchFlowInputGrouping batchFlow : input.getBatchRemoveFlows()) {
+            final RemoveFlowInput removeFlowInput = new RemoveFlowInputBuilder(batchFlow)
+                    .setFlowRef(createFlowRef(input.getNode(), batchFlow))
+                    .setNode(input.getNode())
+                    .build();
+            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.removeFlow(removeFlowInput)));
+        }
+
+        final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
+                Futures.transform(Futures.successfulAsList(resultsLot),
+                        FlowUtil.<RemoveFlowOutput>createCumulatingFunction(input.getBatchRemoveFlows()));
+
+        ListenableFuture<RpcResult<RemoveFlowsBatchOutput>> removeFlowsBulkFuture = Futures.transform(commonResult, FlowUtil.FLOW_REMOVE_TRANSFORM);
+
+        if (input.isBarrierAfter()) {
+            removeFlowsBulkFuture = BarrierUtil.chainBarrier(removeFlowsBulkFuture, input.getNode(),
+                    transactionService, FlowUtil.FLOW_REMOVE_COMPOSING_TRANSFORM);
+        }
+
+        return removeFlowsBulkFuture;
+    }
+
+    @Override
+    public Future<RpcResult<AddFlowsBatchOutput>> addFlowsBatch(final AddFlowsBatchInput input) {
+        LOG.trace("Adding flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddFlows().size());
+        final ArrayList<ListenableFuture<RpcResult<AddFlowOutput>>> resultsLot = new ArrayList<>();
+        for (BatchFlowInputGrouping batchFlow : input.getBatchAddFlows()) {
+            final AddFlowInput addFlowInput = new AddFlowInputBuilder(batchFlow)
+                    .setFlowRef(createFlowRef(input.getNode(), batchFlow))
+                    .setNode(input.getNode())
+                    .build();
+            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.addFlow(addFlowInput)));
+        }
+
+        final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
+                Futures.transform(Futures.successfulAsList(resultsLot),
+                        FlowUtil.<AddFlowOutput>createCumulatingFunction(input.getBatchAddFlows()));
+
+        ListenableFuture<RpcResult<AddFlowsBatchOutput>> addFlowsBulkFuture =
+                Futures.transform(commonResult, FlowUtil.FLOW_ADD_TRANSFORM);
+
+        if (input.isBarrierAfter()) {
+            addFlowsBulkFuture = BarrierUtil.chainBarrier(addFlowsBulkFuture, input.getNode(),
+                    transactionService, FlowUtil.FLOW_ADD_COMPOSING_TRANSFORM);
+        }
+
+        return addFlowsBulkFuture;
+    }
+
+    private static FlowRef createFlowRef(final NodeRef nodeRef, final BatchFlowInputGrouping batchFlow) {
+        return FlowUtil.buildFlowPath((InstanceIdentifier<Node>) nodeRef.getValue(),
+                batchFlow.getTableId(), batchFlow.getFlowId());
+    }
+
+    private static FlowRef createFlowRef(final NodeRef nodeRef, final BatchFlowInputUpdateGrouping batchFlow) {
+        return FlowUtil.buildFlowPath((InstanceIdentifier<Node>) nodeRef.getValue(),
+                batchFlow.getOriginalBatchedFlow().getTableId(), batchFlow.getFlowId());
+    }
+
+    @Override
+    public Future<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBatch(final UpdateFlowsBatchInput input) {
+        LOG.trace("Updating flows @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchUpdateFlows().size());
+        final ArrayList<ListenableFuture<RpcResult<UpdateFlowOutput>>> resultsLot = new ArrayList<>();
+        for (BatchUpdateFlows batchFlow : input.getBatchUpdateFlows()) {
+            final UpdateFlowInput updateFlowInput = new UpdateFlowInputBuilder(input)
+                    .setOriginalFlow(new OriginalFlowBuilder(batchFlow.getOriginalBatchedFlow()).build())
+                    .setUpdatedFlow(new UpdatedFlowBuilder(batchFlow.getUpdatedBatchedFlow()).build())
+                    .setFlowRef(createFlowRef(input.getNode(), batchFlow))
+                    .setNode(input.getNode())
+                    .build();
+            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salFlowService.updateFlow(updateFlowInput)));
+        }
+
+        final ListenableFuture<RpcResult<List<BatchFailedFlowsOutput>>> commonResult =
+                Futures.transform(Futures.successfulAsList(resultsLot), FlowUtil.<UpdateFlowOutput>createCumulatingFunction(input.getBatchUpdateFlows()));
+
+        ListenableFuture<RpcResult<UpdateFlowsBatchOutput>> updateFlowsBulkFuture = Futures.transform(commonResult, FlowUtil.FLOW_UPDATE_TRANSFORM);
+
+        if (input.isBarrierAfter()) {
+            updateFlowsBulkFuture = BarrierUtil.chainBarrier(updateFlowsBulkFuture, input.getNode(),
+                    transactionService, FlowUtil.FLOW_UPDATE_COMPOSING_TRANSFORM);
+        }
+
+        return updateFlowsBulkFuture;
+    }
+
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/BarrierUtil.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/BarrierUtil.java
new file mode 100644 (file)
index 0000000..209ff92
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.util;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * provides barrier message chaining and factory methods
+ */
+public final class BarrierUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BarrierUtil.class);
+
+
+    private BarrierUtil() {
+        throw new IllegalStateException("This class should not be instantiated.");
+    }
+
+
+    /**
+     * chain a barrier message - regardless of previous result and use given {@link Function} to combine
+     * original result and barrier result
+     *
+     * @param <T>                type of input future
+     * @param input              future to chain barrier to
+     * @param nodeRef            target device
+     * @param transactionService barrier service
+     * @param compositeTransform
+     * @return future holding both results (input and of the barrier)
+     */
+    public static <T> ListenableFuture<RpcResult<T>> chainBarrier(
+            final ListenableFuture<RpcResult<T>> input, final NodeRef nodeRef,
+            final FlowCapableTransactionService transactionService,
+            final Function<Pair<RpcResult<T>, RpcResult<Void>>, RpcResult<T>> compositeTransform) {
+        final MutablePair<RpcResult<T>, RpcResult<Void>> resultPair = new MutablePair<>();
+
+        // store input result and append barrier
+        final ListenableFuture<RpcResult<Void>> barrierResult = Futures.transform(input,
+                new AsyncFunction<RpcResult<T>, RpcResult<Void>>() {
+                    @Override
+                    public ListenableFuture<RpcResult<Void>> apply(@Nullable final RpcResult<T> interInput) throws Exception {
+                        resultPair.setLeft(interInput);
+                        final SendBarrierInput barrierInput = createSendBarrierInput(nodeRef);
+                        return JdkFutureAdapters.listenInPoolThread(transactionService.sendBarrier(barrierInput));
+                    }
+                });
+        // store barrier result and return initiated pair
+        final ListenableFuture<Pair<RpcResult<T>, RpcResult<Void>>> compositeResult = Futures.transform(
+                barrierResult, new Function<RpcResult<Void>, Pair<RpcResult<T>, RpcResult<Void>>>() {
+                    @Nullable
+                    @Override
+                    public Pair<RpcResult<T>, RpcResult<Void>> apply(@Nullable final RpcResult<Void> input) {
+                        resultPair.setRight(input);
+                        return resultPair;
+                    }
+                });
+        // append assembling transform to barrier result
+        return Futures.transform(compositeResult, compositeTransform);
+    }
+
+    /**
+     * @param nodeRef rpc routing context
+     * @return input for {@link FlowCapableTransactionService#sendBarrier(SendBarrierInput)}
+     */
+    public static SendBarrierInput createSendBarrierInput(final NodeRef nodeRef) {
+        return new SendBarrierInputBuilder()
+                .setNode(nodeRef)
+                .build();
+    }
+}
index 58bf6d81d44e768035a46840e3aa26947206e1c7..a16e27a7db2b69042232e5e8e2ec2fca117800c3 100644 (file)
@@ -8,8 +8,40 @@
 
 package org.opendaylight.openflowplugin.impl.util;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowIdGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowOutputListGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -21,17 +53,203 @@ public final class FlowUtil {
     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
     private static final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
     private static final Logger LOG = LoggerFactory.getLogger(FlowUtil.class);
+    private static final RpcResultBuilder<List<BatchFailedFlowsOutput>> SUCCESSFUL_FLOW_OUTPUT_RPC_RESULT =
+            RpcResultBuilder.success(Collections.<BatchFailedFlowsOutput>emptyList());
 
+    /** Attach barrier response to given {@link RpcResult}&lt;RemoveFlowsBatchOutput&gt; */
+    public static final Function<Pair<RpcResult<RemoveFlowsBatchOutput>, RpcResult<Void>>, RpcResult<RemoveFlowsBatchOutput>>
+            FLOW_REMOVE_COMPOSING_TRANSFORM = createComposingFunction();
+
+    /** Attach barrier response to given {@link RpcResult}&lt;AddFlowsBatchOutput&gt; */
+    public static final Function<Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>>, RpcResult<AddFlowsBatchOutput>>
+            FLOW_ADD_COMPOSING_TRANSFORM = createComposingFunction();
+
+    /** Attach barrier response to given {@link RpcResult}&lt;UpdateFlowsBatchOutput&gt; */
+    public static final Function<Pair<RpcResult<UpdateFlowsBatchOutput>, RpcResult<Void>>, RpcResult<UpdateFlowsBatchOutput>>
+            FLOW_UPDATE_COMPOSING_TRANSFORM = createComposingFunction();
+
+    /**
+     * Gather errors into collection and wrap it into {@link RpcResult} and propagate all {@link RpcError}
+     */
+    public static final Function<RpcResult<List<BatchFailedFlowsOutput>>, RpcResult<RemoveFlowsBatchOutput>> FLOW_REMOVE_TRANSFORM =
+            new Function<RpcResult<List<BatchFailedFlowsOutput>>, RpcResult<RemoveFlowsBatchOutput>>() {
+                @Nullable
+                @Override
+                public RpcResult<RemoveFlowsBatchOutput> apply(@Nullable final RpcResult<List<BatchFailedFlowsOutput>> batchFlowsCumulativeResult) {
+                    final RemoveFlowsBatchOutput batchOutput = new RemoveFlowsBatchOutputBuilder()
+                            .setBatchFailedFlowsOutput(batchFlowsCumulativeResult.getResult()).build();
+
+                    final RpcResultBuilder<RemoveFlowsBatchOutput> resultBld =
+                            createCumulativeRpcResult(batchFlowsCumulativeResult, batchOutput);
+                    return resultBld.build();
+                }
+            };
+
+    /**
+     * Gather errors into collection and wrap it into {@link RpcResult} and propagate all {@link RpcError}
+     */
+    public static final Function<RpcResult<List<BatchFailedFlowsOutput>>, RpcResult<AddFlowsBatchOutput>> FLOW_ADD_TRANSFORM =
+            new Function<RpcResult<List<BatchFailedFlowsOutput>>, RpcResult<AddFlowsBatchOutput>>() {
+                @Nullable
+                @Override
+                public RpcResult<AddFlowsBatchOutput> apply(@Nullable final RpcResult<List<BatchFailedFlowsOutput>> batchFlowsCumulativeResult) {
+                    final AddFlowsBatchOutput batchOutput = new AddFlowsBatchOutputBuilder()
+                            .setBatchFailedFlowsOutput(batchFlowsCumulativeResult.getResult()).build();
+
+                    final RpcResultBuilder<AddFlowsBatchOutput> resultBld =
+                            createCumulativeRpcResult(batchFlowsCumulativeResult, batchOutput);
+                    return resultBld.build();
+                }
+            };
+
+    /**
+     * Gather errors into collection and wrap it into {@link RpcResult} and propagate all {@link RpcError}
+     */
+    public static final Function<RpcResult<List<BatchFailedFlowsOutput>>, RpcResult<UpdateFlowsBatchOutput>> FLOW_UPDATE_TRANSFORM =
+            new Function<RpcResult<List<BatchFailedFlowsOutput>>, RpcResult<UpdateFlowsBatchOutput>>() {
+                @Nullable
+                @Override
+                public RpcResult<UpdateFlowsBatchOutput> apply(@Nullable final RpcResult<List<BatchFailedFlowsOutput>> batchFlowsCumulativeResult) {
+                    final UpdateFlowsBatchOutput batchOutput = new UpdateFlowsBatchOutputBuilder()
+                            .setBatchFailedFlowsOutput(batchFlowsCumulativeResult.getResult()).build();
+
+                    final RpcResultBuilder<UpdateFlowsBatchOutput> resultBld =
+                            createCumulativeRpcResult(batchFlowsCumulativeResult, batchOutput);
+                    return resultBld.build();
+                }
+            };
 
     private FlowUtil() {
         throw new IllegalStateException("This class should not be instantiated.");
     }
 
+    /**
+     * Wrap given list of problematic flow-ids into {@link RpcResult} of given type.
+     *
+     * @param batchFlowsCumulativeResult list of ids failed flows
+     * @param batchOutput
+     * @param <T>                        flow operation type
+     * @return batch flow operation output of given type containing list of flow-ids and corresponding success flag
+     */
+    private static <T extends BatchFlowOutputListGrouping>
+    RpcResultBuilder<T> createCumulativeRpcResult(final @Nullable RpcResult<List<BatchFailedFlowsOutput>> batchFlowsCumulativeResult,
+                                                  final T batchOutput) {
+        final RpcResultBuilder<T> resultBld;
+        if (batchFlowsCumulativeResult.isSuccessful()) {
+            resultBld = RpcResultBuilder.success(batchOutput);
+        } else {
+            resultBld = RpcResultBuilder.failed();
+            resultBld.withResult(batchOutput)
+                    .withRpcErrors(batchFlowsCumulativeResult.getErrors());
+        }
+        return resultBld;
+    }
+
     public static FlowId createAlienFlowId(final short tableId) {
         final StringBuilder sBuilder = new StringBuilder(ALIEN_SYSTEM_FLOW_ID)
                 .append(tableId).append('-').append(unaccountedFlowsCounter.incrementAndGet());
-        String alienId =  sBuilder.toString();
+        String alienId = sBuilder.toString();
         return new FlowId(alienId);
 
     }
+
+    /**
+     * Factory method: create {@link Function} which attaches barrier response to given {@link RpcResult}&lt;T&gt;
+     * and changes success flag if needed.
+     * <br>
+     * Original rpcResult is the {@link Pair#getLeft()} and barrier result is the {@link Pair#getRight()}.
+     *
+     * @param <T> type of rpcResult value
+     * @return reusable static function
+     */
+    @VisibleForTesting
+    static <T extends BatchFlowOutputListGrouping>
+    Function<Pair<RpcResult<T>, RpcResult<Void>>, RpcResult<T>> createComposingFunction() {
+        return new Function<Pair<RpcResult<T>, RpcResult<Void>>, RpcResult<T>>() {
+            @Nullable
+            @Override
+            public RpcResult<T> apply(@Nullable final Pair<RpcResult<T>, RpcResult<Void>> input) {
+                final RpcResultBuilder<T> resultBld;
+                if (input.getLeft().isSuccessful() && input.getRight().isSuccessful()) {
+                    resultBld = RpcResultBuilder.success();
+                } else {
+                    resultBld = RpcResultBuilder.failed();
+                }
+
+                final ArrayList<RpcError> rpcErrors = new ArrayList<>(input.getLeft().getErrors());
+                rpcErrors.addAll(input.getRight().getErrors());
+                resultBld.withRpcErrors(rpcErrors);
+
+                resultBld.withResult(input.getLeft().getResult());
+
+                return resultBld.build();
+            }
+        };
+    }
+
+    /**
+     * @param nodePath path to {@link Node}
+     * @param tableId  path to {@link Table} under {@link Node}
+     * @param flowId   path to {@link Flow} under {@link Table}
+     * @return instance identifier assembled for given node, table and flow
+     */
+    public static FlowRef buildFlowPath(final InstanceIdentifier<Node> nodePath,
+                                        final short tableId, final FlowId flowId) {
+        final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = nodePath
+                .augmentation(FlowCapableNode.class)
+                .child(Table.class, new TableKey(tableId))
+                .child(Flow.class, new FlowKey(new FlowId(flowId)));
+
+        return new FlowRef(flowPath);
+    }
+
+    /**
+     * Factory method: creates {@link Function} which keeps info of original inputs (passed to flow-rpc) and processes
+     * list of all flow-rpc results.
+     *
+     * @param inputBatchFlows collection of problematic flow-ids wrapped in container of given type &lt;O&gt;
+     * @param <O>             result container type
+     * @return static reusable function
+     */
+    public static <O> Function<List<RpcResult<O>>, RpcResult<List<BatchFailedFlowsOutput>>> createCumulatingFunction(
+            final List<? extends BatchFlowIdGrouping> inputBatchFlows) {
+        return new Function<List<RpcResult<O>>, RpcResult<List<BatchFailedFlowsOutput>>>() {
+            @Nullable
+            @Override
+            public RpcResult<List<BatchFailedFlowsOutput>> apply(@Nullable final List<RpcResult<O>> innerInput) {
+                final int sizeOfFutures = innerInput.size();
+                final int sizeOfInputBatch = inputBatchFlows.size();
+                Preconditions.checkArgument(sizeOfFutures == sizeOfInputBatch,
+                        "wrong amount of returned futures: {} <> {}", sizeOfFutures, sizeOfInputBatch);
+
+                final ArrayList<BatchFailedFlowsOutput> batchFlows = new ArrayList<>(sizeOfFutures);
+                final Iterator<? extends BatchFlowIdGrouping> batchFlowIterator = inputBatchFlows.iterator();
+
+                Collection<RpcError> flowErrors = new ArrayList<>(sizeOfFutures);
+
+                int batchOrder = 0;
+                for (RpcResult<O> flowModOutput : innerInput) {
+                    final FlowId flowId = batchFlowIterator.next().getFlowId();
+
+                    if (!flowModOutput.isSuccessful()) {
+                        batchFlows.add(new BatchFailedFlowsOutputBuilder()
+                                .setFlowId(flowId)
+                                .setBatchOrder(batchOrder)
+                                .build());
+                        flowErrors.addAll(flowModOutput.getErrors());
+                    }
+                    batchOrder++;
+                }
+
+                final RpcResultBuilder<List<BatchFailedFlowsOutput>> resultBuilder;
+                if (!flowErrors.isEmpty()) {
+                    resultBuilder = RpcResultBuilder.<List<BatchFailedFlowsOutput>>failed()
+                            .withRpcErrors(flowErrors).withResult(batchFlows);
+                } else {
+                    resultBuilder = SUCCESSFUL_FLOW_OUTPUT_RPC_RESULT;
+                }
+                return resultBuilder.build();
+            }
+        };
+    }
 }
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/PathUtil.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/PathUtil.java
new file mode 100644 (file)
index 0000000..36ee174
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.util;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Purpose: utility class providing path and {@link InstanceIdentifier} tools
+ */
+public class PathUtil {
+
+    private PathUtil() {
+        throw new IllegalStateException("This class should not be instantiated.");
+    }
+
+
+    /**
+     * @param input reference to {@link Node}
+     * @return node-id from given reference
+     */
+    public static NodeId extractNodeId(final NodeRef input) {
+        return input.getValue().firstKeyOf(Node.class).getId();
+    }
+}
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalFlowsBatchServiceImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalFlowsBatchServiceImplTest.java
new file mode 100644 (file)
index 0000000..15a9aa2
--- /dev/null
@@ -0,0 +1,339 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.services;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.Future;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.MatchBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.RemoveFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.UpdateFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.add.flows.batch.input.BatchAddFlows;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.add.flows.batch.input.BatchAddFlowsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.OriginalBatchedFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.input.update.grouping.UpdatedBatchedFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.remove.flows.batch.input.BatchRemoveFlows;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.remove.flows.batch.input.BatchRemoveFlowsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.update.flows.batch.input.BatchUpdateFlows;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.update.flows.batch.input.BatchUpdateFlowsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link SalFlowsBatchServiceImpl}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SalFlowsBatchServiceImplTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SalFlowsBatchServiceImplTest.class);
+
+    public static final NodeId NODE_ID = new NodeId("ut-dummy-node");
+    public static final NodeKey NODE_KEY = new NodeKey(NODE_ID);
+    public static final NodeRef NODE_REF = new NodeRef(InstanceIdentifier.create(Nodes.class).child(Node.class, NODE_KEY));
+
+    @Mock
+    private SalFlowService salFlowService;
+    @Mock
+    private FlowCapableTransactionService transactionService;
+    @Captor
+    private ArgumentCaptor<RemoveFlowInput> removeFlowInputCpt;
+    @Captor
+    private ArgumentCaptor<UpdateFlowInput> updateFlowInputCpt;
+    @Captor
+    private ArgumentCaptor<AddFlowInput> addFlowInputCpt;
+
+    private SalFlowsBatchServiceImpl salFlowsBatchService;
+    public static final String FLOW_ID_VALUE_1 = "ut-dummy-flow1";
+    public static final String FLOW_ID_VALUE_2 = "ut-dummy-flow2";
+
+    @Before
+    public void setUp() throws Exception {
+        salFlowsBatchService = new SalFlowsBatchServiceImpl(salFlowService, transactionService);
+
+        Mockito.when(transactionService.sendBarrier(Matchers.<SendBarrierInput>any()))
+                .thenReturn(RpcResultBuilder.<Void>success().buildFuture());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Mockito.verifyNoMoreInteractions(salFlowService, transactionService);
+    }
+
+    @Test
+    public void testRemoveFlowsBatch_success() throws Exception {
+        Mockito.when(salFlowService.removeFlow(Matchers.<RemoveFlowInput>any()))
+                .thenReturn(RpcResultBuilder.success(new RemoveFlowOutputBuilder().build())
+                        .buildFuture());
+
+        final String flow1IdValue = "ut-dummy-flow1";
+        final String flow2IdValue = "ut-dummy-flow2";
+        final BatchRemoveFlows batchFlow1 = createEmptyBatchRemoveFlow(flow1IdValue, 42);
+        final BatchRemoveFlows batchFlow2 = createEmptyBatchRemoveFlow(flow2IdValue, 43);
+
+        final RemoveFlowsBatchInput input = new RemoveFlowsBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchRemoveFlows(Lists.newArrayList(batchFlow1, batchFlow2))
+                .build();
+
+        final Future<RpcResult<RemoveFlowsBatchOutput>> resultFuture = salFlowsBatchService.removeFlowsBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        final RpcResult<RemoveFlowsBatchOutput> rpcResult = resultFuture.get();
+        Assert.assertTrue(rpcResult.isSuccessful());
+        final RemoveFlowsBatchOutput result = rpcResult.getResult();
+        Assert.assertEquals(0, result.getBatchFailedFlowsOutput().size());
+
+        final InOrder inOrder = Mockito.inOrder(salFlowService, transactionService);
+
+        inOrder.verify(salFlowService, Mockito.times(2)).removeFlow(removeFlowInputCpt.capture());
+        final List<RemoveFlowInput> allValues = removeFlowInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getPriority().longValue());
+        Assert.assertEquals(43, allValues.get(1).getPriority().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testRemoveFlowsBatch_failed() throws Exception {
+        Mockito.when(salFlowService.removeFlow(Matchers.<RemoveFlowInput>any()))
+                .thenReturn(RpcResultBuilder.<RemoveFlowOutput>failed()
+                        .withError(RpcError.ErrorType.APPLICATION, "flow-remove-fail-1")
+                        .buildFuture());
+
+        final BatchRemoveFlows batchFlow1 = createEmptyBatchRemoveFlow(FLOW_ID_VALUE_1, 42);
+        final BatchRemoveFlows batchFlow2 = createEmptyBatchRemoveFlow(FLOW_ID_VALUE_2, 43);
+
+        final RemoveFlowsBatchInput input = new RemoveFlowsBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchRemoveFlows(Lists.newArrayList(batchFlow1, batchFlow2))
+                .build();
+
+        final Future<RpcResult<RemoveFlowsBatchOutput>> resultFuture = salFlowsBatchService.removeFlowsBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        final RpcResult<RemoveFlowsBatchOutput> rpcResult = resultFuture.get();
+        Assert.assertFalse(rpcResult.isSuccessful());
+        final RemoveFlowsBatchOutput result = rpcResult.getResult();
+        Assert.assertEquals(2, result.getBatchFailedFlowsOutput().size());
+        Assert.assertEquals(FLOW_ID_VALUE_1, result.getBatchFailedFlowsOutput().get(0).getFlowId().getValue());
+        Assert.assertEquals(FLOW_ID_VALUE_2, result.getBatchFailedFlowsOutput().get(1).getFlowId().getValue());
+
+        final InOrder inOrder = Mockito.inOrder(salFlowService, transactionService);
+
+        inOrder.verify(salFlowService, Mockito.times(2)).removeFlow(removeFlowInputCpt.capture());
+        final List<RemoveFlowInput> allValues = removeFlowInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getPriority().longValue());
+        Assert.assertEquals(43, allValues.get(1).getPriority().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    private static BatchAddFlows createEmptyBatchAddFlow(final String flowIdValue, final int priority) {
+        return new BatchAddFlowsBuilder()
+                .setFlowId(new FlowId(flowIdValue))
+                .setPriority(priority)
+                .setMatch(new MatchBuilder().build())
+                .setTableId((short) 0)
+                .build();
+    }
+
+    private static BatchRemoveFlows createEmptyBatchRemoveFlow(final String flowIdValue, final int priority) {
+        return new BatchRemoveFlowsBuilder()
+                .setFlowId(new FlowId(flowIdValue))
+                .setPriority(priority)
+                .setMatch(new MatchBuilder().build())
+                .setTableId((short) 0)
+                .build();
+    }
+
+    private static BatchUpdateFlows createEmptyBatchUpdateFlow(final String flowIdValue, final int priority) {
+        final BatchAddFlows emptyOriginalFlow = createEmptyBatchAddFlow(flowIdValue, priority);
+        final BatchAddFlows emptyUpdatedFlow = createEmptyBatchAddFlow(flowIdValue, priority + 1);
+        return new BatchUpdateFlowsBuilder()
+                .setFlowId(new FlowId(flowIdValue))
+                .setOriginalBatchedFlow(new OriginalBatchedFlowBuilder(emptyOriginalFlow).build())
+                .setUpdatedBatchedFlow(new UpdatedBatchedFlowBuilder(emptyUpdatedFlow).build())
+                .build();
+    }
+
+    @Test
+    public void testAddFlowsBatch_success() throws Exception {
+        Mockito.when(salFlowService.addFlow(Matchers.<AddFlowInput>any()))
+                .thenReturn(RpcResultBuilder.success(new AddFlowOutputBuilder().build()).buildFuture());
+
+        final AddFlowsBatchInput input = new AddFlowsBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchAddFlows(Lists.newArrayList(
+                        createEmptyBatchAddFlow("ut-dummy-flow1", 42),
+                        createEmptyBatchAddFlow("ut-dummy-flow2", 43)))
+                .build();
+
+        final Future<RpcResult<AddFlowsBatchOutput>> resultFuture = salFlowsBatchService.addFlowsBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertTrue(resultFuture.get().isSuccessful());
+
+        final InOrder inOrder = Mockito.inOrder(salFlowService, transactionService);
+
+        inOrder.verify(salFlowService, Mockito.times(2)).addFlow(addFlowInputCpt.capture());
+        final List<AddFlowInput> allValues = addFlowInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getPriority().longValue());
+        Assert.assertEquals(43, allValues.get(1).getPriority().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testAddFlowsBatch_failed() throws Exception {
+        Mockito.when(salFlowService.addFlow(Matchers.<AddFlowInput>any()))
+                .thenReturn(RpcResultBuilder.<AddFlowOutput>failed().withError(RpcError.ErrorType.APPLICATION, "ut-groupAddError")
+                        .buildFuture());
+
+        final AddFlowsBatchInput input = new AddFlowsBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchAddFlows(Lists.newArrayList(
+                        createEmptyBatchAddFlow(FLOW_ID_VALUE_1, 42),
+                        createEmptyBatchAddFlow(FLOW_ID_VALUE_2, 43)))
+                .build();
+
+        final Future<RpcResult<AddFlowsBatchOutput>> resultFuture = salFlowsBatchService.addFlowsBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertFalse(resultFuture.get().isSuccessful());
+        Assert.assertEquals(2, resultFuture.get().getResult().getBatchFailedFlowsOutput().size());
+        Assert.assertEquals(FLOW_ID_VALUE_1, resultFuture.get().getResult().getBatchFailedFlowsOutput().get(0).getFlowId().getValue());
+        Assert.assertEquals(FLOW_ID_VALUE_2, resultFuture.get().getResult().getBatchFailedFlowsOutput().get(1).getFlowId().getValue());
+        Assert.assertEquals(2, resultFuture.get().getErrors().size());
+
+        final InOrder inOrder = Mockito.inOrder(salFlowService, transactionService);
+
+        inOrder.verify(salFlowService, Mockito.times(2)).addFlow(addFlowInputCpt.capture());
+        final List<AddFlowInput> allValues = addFlowInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getPriority().longValue());
+        Assert.assertEquals(43, allValues.get(1).getPriority().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testUpdateFlowsBatch_success() throws Exception {
+        Mockito.when(salFlowService.updateFlow(Matchers.<UpdateFlowInput>any()))
+                .thenReturn(RpcResultBuilder.success(new UpdateFlowOutputBuilder().build()).buildFuture());
+
+        final UpdateFlowsBatchInput input = new UpdateFlowsBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchUpdateFlows(Lists.newArrayList(
+                        createEmptyBatchUpdateFlow(FLOW_ID_VALUE_1, 42),
+                        createEmptyBatchUpdateFlow(FLOW_ID_VALUE_2, 44)))
+                .build();
+
+        final Future<RpcResult<UpdateFlowsBatchOutput>> resultFuture = salFlowsBatchService.updateFlowsBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertTrue(resultFuture.get().isSuccessful());
+
+        final InOrder inOrder = Mockito.inOrder(salFlowService, transactionService);
+
+        inOrder.verify(salFlowService, Mockito.times(2)).updateFlow(updateFlowInputCpt.capture());
+        final List<UpdateFlowInput> allValues = updateFlowInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getOriginalFlow().getPriority().longValue());
+        Assert.assertEquals(43, allValues.get(0).getUpdatedFlow().getPriority().longValue());
+        Assert.assertEquals(44, allValues.get(1).getOriginalFlow().getPriority().longValue());
+        Assert.assertEquals(45, allValues.get(1).getUpdatedFlow().getPriority().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testUpdateFlowsBatch_failure() throws Exception {
+        Mockito.when(salFlowService.updateFlow(Matchers.<UpdateFlowInput>any()))
+                .thenReturn(RpcResultBuilder.<UpdateFlowOutput>failed().withError(RpcError.ErrorType.APPLICATION, "ut-flowUpdateError")
+                        .buildFuture());
+
+        final UpdateFlowsBatchInput input = new UpdateFlowsBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchUpdateFlows(Lists.newArrayList(
+                        createEmptyBatchUpdateFlow(FLOW_ID_VALUE_1, 42),
+                        createEmptyBatchUpdateFlow(FLOW_ID_VALUE_2, 44)))
+                .build();
+
+        final Future<RpcResult<UpdateFlowsBatchOutput>> resultFuture = salFlowsBatchService.updateFlowsBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertFalse(resultFuture.get().isSuccessful());
+        Assert.assertFalse(resultFuture.get().isSuccessful());
+        Assert.assertEquals(2, resultFuture.get().getResult().getBatchFailedFlowsOutput().size());
+        Assert.assertEquals(FLOW_ID_VALUE_1, resultFuture.get().getResult().getBatchFailedFlowsOutput().get(0).getFlowId().getValue());
+        Assert.assertEquals(FLOW_ID_VALUE_2, resultFuture.get().getResult().getBatchFailedFlowsOutput().get(1).getFlowId().getValue());
+        Assert.assertEquals(2, resultFuture.get().getErrors().size());
+
+        final InOrder inOrder = Mockito.inOrder(salFlowService, transactionService);
+        inOrder.verify(salFlowService, Mockito.times(2)).updateFlow(updateFlowInputCpt.capture());
+        final List<UpdateFlowInput> allValues = updateFlowInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getOriginalFlow().getPriority().longValue());
+        Assert.assertEquals(43, allValues.get(0).getUpdatedFlow().getPriority().longValue());
+        Assert.assertEquals(44, allValues.get(1).getOriginalFlow().getPriority().longValue());
+        Assert.assertEquals(45, allValues.get(1).getUpdatedFlow().getPriority().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+}
\ No newline at end of file
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/BarrierUtilTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/BarrierUtilTest.java
new file mode 100644 (file)
index 0000000..9c6cf99
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.util;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * Test for {@link BarrierUtil}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class BarrierUtilTest {
+
+    public static final NodeKey NODE_KEY = new NodeKey(new NodeId("ut-dummy-node"));
+    private static final NodeRef NODE_REF = new NodeRef(InstanceIdentifier.create(Nodes.class)
+            .child(Node.class, NODE_KEY));
+
+    @Mock
+    private FlowCapableTransactionService transactionService;
+    @Mock
+    private Function<Pair<RpcResult<String>, RpcResult<Void>>, RpcResult<String>> compositeTransform;
+    @Captor
+    private ArgumentCaptor<Pair<RpcResult<String>, RpcResult<Void>>> pairCpt;
+
+    @Before
+    public void setUp() throws Exception {
+        Mockito.when(transactionService.sendBarrier(Matchers.<SendBarrierInput>any()))
+                .thenReturn(RpcResultBuilder.<Void>success().buildFuture());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Mockito.verifyNoMoreInteractions(transactionService, compositeTransform);
+    }
+
+    @Test
+    public void testChainBarrier() throws Exception {
+        final String data = "ut-data1";
+        final ListenableFuture<RpcResult<String>> input = RpcResultBuilder.success(data).buildFuture();
+        final ListenableFuture<RpcResult<String>> chainResult =
+                BarrierUtil.chainBarrier(input, NODE_REF, transactionService, compositeTransform);
+
+        Mockito.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+        Mockito.verify(compositeTransform).apply(pairCpt.capture());
+
+        final Pair<RpcResult<String>, RpcResult<Void>> value = pairCpt.getValue();
+        Assert.assertTrue(value.getLeft().isSuccessful());
+        Assert.assertEquals(data, value.getLeft().getResult());
+        Assert.assertTrue(value.getRight().isSuccessful());
+        Assert.assertNull(value.getRight().getResult());
+
+    }
+
+    @Test
+    public void testCreateSendBarrierInput() throws Exception {
+        final SendBarrierInput barrierInput = BarrierUtil.createSendBarrierInput(NODE_REF);
+
+        Assert.assertEquals(NODE_REF, barrierInput.getNode());
+        Assert.assertEquals(SendBarrierInput.class, barrierInput.getImplementedInterface());
+    }
+}
\ No newline at end of file
index 61386b44fcb108bf707125c1bdf82807cc84e425..2c3feb93db9ca46c9d623411dedabe8b6f5b1b88 100644 (file)
 
 package org.opendaylight.openflowplugin.impl.util;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Test;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.AddFlowsBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowIdGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.BatchFlowOutputListGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flows.service.rev160314.batch.flow.output.list.grouping.BatchFailedFlowsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 
 public class FlowUtilTest {
 
-    private static final short DUMMY_TABLE_ID = 1;
     public static final Pattern INDEX_PATTERN = Pattern.compile("^#UF\\$TABLE\\*1-([0-9]+)$");
+    public static final NodeId DUMMY_NODE_ID = new NodeId("dummyNodeId");
+    public static final FlowId DUMMY_FLOW_ID = new FlowId("dummyFlowId");
+    public static final FlowId DUMMY_FLOW_ID_2 = new FlowId("dummyFlowId_2");
+    public static final Short DUMMY_TABLE_ID = 1;
 
     @Test
     public void createAlienFlowIdTest() {
@@ -33,8 +58,8 @@ public class FlowUtilTest {
         final String alienFlowId2 = FlowUtil.createAlienFlowId(DUMMY_TABLE_ID).getValue();
         final Integer index2 = parseIndex(alienFlowId2);
 
-        assertNotNull("index1 parsing failed: "+alienFlowId1, index1);
-        assertNotNull("index2 parsing failed: "+alienFlowId2, index2);
+        assertNotNull("index1 parsing failed: " + alienFlowId1, index1);
+        assertNotNull("index2 parsing failed: " + alienFlowId2, index2);
         assertTrue(index1 < index2);
     }
 
@@ -46,4 +71,190 @@ public class FlowUtilTest {
         return null;
     }
 
+    @Test
+    public void testBuildFlowPath() throws Exception {
+        final InstanceIdentifier<Node> nodePath = InstanceIdentifier
+                .create(Nodes.class)
+                .child(Node.class, new NodeKey(DUMMY_NODE_ID));
+
+        final FlowRef flowRef = FlowUtil.buildFlowPath(nodePath, DUMMY_TABLE_ID, DUMMY_FLOW_ID);
+        final InstanceIdentifier<?> flowRefValue = flowRef.getValue();
+        Assert.assertEquals(DUMMY_NODE_ID, flowRefValue.firstKeyOf(Node.class).getId());
+        Assert.assertEquals(DUMMY_TABLE_ID, flowRefValue.firstKeyOf(Table.class).getId());
+        Assert.assertEquals(DUMMY_FLOW_ID, flowRefValue.firstKeyOf(Flow.class).getId());
+    }
+
+    @Test
+    public void testCreateCumulatingFunction() throws Exception {
+        final Function<List<RpcResult<String>>, RpcResult<List<BatchFailedFlowsOutput>>> function =
+                FlowUtil.createCumulatingFunction(Lists.newArrayList(createBatchFlowIdGrouping(DUMMY_FLOW_ID),
+                        createBatchFlowIdGrouping(DUMMY_FLOW_ID_2)));
+
+        final RpcResult<List<BatchFailedFlowsOutput>> summary = function.apply(Lists.newArrayList(
+                RpcResultBuilder.success("a").build(),
+                RpcResultBuilder.<String>failed()
+                        .withError(RpcError.ErrorType.APPLICATION, "action-failed reason")
+                        .build()));
+
+        Assert.assertFalse(summary.isSuccessful());
+        Assert.assertEquals(1, summary.getResult().size());
+        Assert.assertEquals(1, summary.getErrors().size());
+        Assert.assertEquals(DUMMY_FLOW_ID_2, summary.getResult().get(0).getFlowId());
+        Assert.assertEquals(1, summary.getResult().get(0).getBatchOrder().intValue());
+    }
+
+    protected BatchFlowIdGrouping createBatchFlowIdGrouping(final FlowId flowId) {
+        final BatchFlowIdGrouping mock = Mockito.mock(BatchFlowIdGrouping.class);
+        Mockito.when(mock.getFlowId()).thenReturn(flowId);
+        return mock;
+    }
+
+    @Test
+    public void testFLOW_ADD_TRANSFORM__failure() throws Exception {
+        final RpcResult<List<BatchFailedFlowsOutput>> input = createBatchOutcomeWithError();
+        checkBatchErrorOutcomeTransformation(FlowUtil.FLOW_ADD_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_ADD_TRANSFORM__success() throws Exception {
+        final RpcResult<List<BatchFailedFlowsOutput>> input = createEmptyBatchOutcome();
+        checkBatchSuccessOutcomeTransformation(FlowUtil.FLOW_ADD_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_REMOVE_TRANSFORM__failure() throws Exception {
+        final RpcResult<List<BatchFailedFlowsOutput>> input = createBatchOutcomeWithError();
+        checkBatchErrorOutcomeTransformation(FlowUtil.FLOW_REMOVE_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_REMOVE_TRANSFORM__success() throws Exception {
+        final RpcResult<List<BatchFailedFlowsOutput>> input = createEmptyBatchOutcome();
+        checkBatchSuccessOutcomeTransformation(FlowUtil.FLOW_REMOVE_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_UPDATE_TRANSFORM__failure() throws Exception {
+        final RpcResult<List<BatchFailedFlowsOutput>> input = createBatchOutcomeWithError();
+        checkBatchErrorOutcomeTransformation(FlowUtil.FLOW_UPDATE_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_UPDATE_TRANSFORM__success() throws Exception {
+        final RpcResult<List<BatchFailedFlowsOutput>> input = createEmptyBatchOutcome();
+        checkBatchSuccessOutcomeTransformation(FlowUtil.FLOW_UPDATE_TRANSFORM.apply(input));
+    }
+
+    private <T extends BatchFlowOutputListGrouping> void checkBatchSuccessOutcomeTransformation(final RpcResult<T> output) {
+        Assert.assertTrue(output.isSuccessful());
+        Assert.assertEquals(0, output.getResult().getBatchFailedFlowsOutput().size());
+        Assert.assertEquals(0, output.getErrors().size());
+    }
+
+    private RpcResult<List<BatchFailedFlowsOutput>> createEmptyBatchOutcome() {
+        return RpcResultBuilder
+                .<List<BatchFailedFlowsOutput>>success(Collections.<BatchFailedFlowsOutput>emptyList())
+                .build();
+    }
+
+    private RpcResult<List<BatchFailedFlowsOutput>> createBatchOutcomeWithError() {
+        return RpcResultBuilder.<List<BatchFailedFlowsOutput>>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "ut-flowAddFail")
+                .withResult(Collections.singletonList(new BatchFailedFlowsOutputBuilder()
+                        .setFlowId(DUMMY_FLOW_ID)
+                        .build()))
+                .build();
+    }
+
+    private <T extends BatchFlowOutputListGrouping> void checkBatchErrorOutcomeTransformation(final RpcResult<T> output) {
+        Assert.assertFalse(output.isSuccessful());
+        Assert.assertEquals(1, output.getResult().getBatchFailedFlowsOutput().size());
+        Assert.assertEquals(DUMMY_FLOW_ID, output.getResult().getBatchFailedFlowsOutput().get(0).getFlowId());
+
+        Assert.assertEquals(1, output.getErrors().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_success_success() throws Exception {
+        final Function<Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>>, RpcResult<AddFlowsBatchOutput>> compositeFunction =
+                FlowUtil.createComposingFunction();
+
+        final RpcResult<AddFlowsBatchOutput> addFlowBatchOutput = createAddFlowsBatchSuccessOutput();
+        final RpcResult<Void> barrierOutput = RpcResultBuilder.<Void>success().build();
+        final Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>> input = Pair.of(addFlowBatchOutput, barrierOutput);
+        final RpcResult<AddFlowsBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertTrue(composite.isSuccessful());
+        Assert.assertEquals(0, composite.getErrors().size());
+        Assert.assertEquals(0, composite.getResult().getBatchFailedFlowsOutput().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_failure_success() throws Exception {
+        final Function<Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>>, RpcResult<AddFlowsBatchOutput>> compositeFunction =
+                FlowUtil.createComposingFunction();
+
+        final RpcResult<AddFlowsBatchOutput> addFlowBatchOutput = createAddFlowsBatchFailureOutcome();
+        final RpcResult<Void> barrierOutput = RpcResultBuilder.<Void>success().build();
+        final Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>> input = Pair.of(addFlowBatchOutput, barrierOutput);
+        final RpcResult<AddFlowsBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertFalse(composite.isSuccessful());
+        Assert.assertEquals(1, composite.getErrors().size());
+        Assert.assertEquals(1, composite.getResult().getBatchFailedFlowsOutput().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_success_failure() throws Exception {
+        final Function<Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>>, RpcResult<AddFlowsBatchOutput>> compositeFunction =
+                FlowUtil.createComposingFunction();
+
+        final RpcResult<AddFlowsBatchOutput> addFlowBatchOutput = createAddFlowsBatchSuccessOutput();
+        final RpcResult<Void> barrierOutput = createBarrierFailureOutcome();
+        final Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>> input = Pair.of(addFlowBatchOutput, barrierOutput);
+        final RpcResult<AddFlowsBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertFalse(composite.isSuccessful());
+        Assert.assertEquals(1, composite.getErrors().size());
+        Assert.assertEquals(0, composite.getResult().getBatchFailedFlowsOutput().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_failure_failure() throws Exception {
+        final Function<Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>>, RpcResult<AddFlowsBatchOutput>> compositeFunction =
+                FlowUtil.createComposingFunction();
+
+        final RpcResult<AddFlowsBatchOutput> addFlowBatchOutput = createAddFlowsBatchFailureOutcome();
+        final RpcResult<Void> barrierOutput = createBarrierFailureOutcome();
+        final Pair<RpcResult<AddFlowsBatchOutput>, RpcResult<Void>> input = Pair.of(addFlowBatchOutput, barrierOutput);
+        final RpcResult<AddFlowsBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertFalse(composite.isSuccessful());
+        Assert.assertEquals(2, composite.getErrors().size());
+        Assert.assertEquals(1, composite.getResult().getBatchFailedFlowsOutput().size());
+    }
+
+    private RpcResult<Void> createBarrierFailureOutcome() {
+        return RpcResultBuilder.<Void>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "ut-barrier-error")
+                .build();
+    }
+
+    private RpcResult<AddFlowsBatchOutput> createAddFlowsBatchSuccessOutput() {
+        return RpcResultBuilder
+                .success(new AddFlowsBatchOutputBuilder()
+                        .setBatchFailedFlowsOutput(Collections.<BatchFailedFlowsOutput>emptyList())
+                        .build())
+                .build();
+    }
+
+    private RpcResult<AddFlowsBatchOutput> createAddFlowsBatchFailureOutcome() {
+        final RpcResult<List<BatchFailedFlowsOutput>> batchOutcomeWithError = createBatchOutcomeWithError();
+        return RpcResultBuilder.<AddFlowsBatchOutput>failed()
+                .withResult(new AddFlowsBatchOutputBuilder()
+                        .setBatchFailedFlowsOutput(batchOutcomeWithError.getResult())
+                        .build())
+                .withRpcErrors(batchOutcomeWithError.getErrors())
+                .build();
+    }
 }
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/PathUtilTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/PathUtilTest.java
new file mode 100644 (file)
index 0000000..d44381b
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link PathUtil}.
+ */
+public class PathUtilTest {
+
+    public static final NodeId NODE_ID = new NodeId("ut-dummy-node");
+    public static final NodeKey NODE_KEY = new NodeKey(NODE_ID);
+    public static final NodeRef NODE_REF = new NodeRef(InstanceIdentifier.create(Nodes.class).child(Node.class, NODE_KEY));
+
+    @Test
+    public void testExtractNodeId() throws Exception {
+        Assert.assertEquals(NODE_ID, PathUtil.extractNodeId(NODE_REF));
+    }
+}
\ No newline at end of file