BUG-5574: bulk meters proposal 09/36309/6
authorMichal Rehak <mirehak@cisco.com>
Wed, 16 Mar 2016 12:48:00 +0000 (13:48 +0100)
committerMichal Rehak <mirehak@cisco.com>
Thu, 21 Apr 2016 11:01:33 +0000 (13:01 +0200)
    - added bulk API implementation for groups
    - with parametrized barrier-after support
    - MeterUtil
    - unit tests

Change-Id: I9146894ae0b1f630706b8aea6b85a13c8ce3364b
Signed-off-by: Michal Rehak <mirehak@cisco.com>
(cherry picked from commit 26fa4ba6f87c23e159b74526f6fb849f0ceb4783)
Signed-off-by: andrej.leitner <anleitne@cisco.com>
model/model-flow-service/src/main/yang/sal-meters-batch.yang [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalMetersBatchServiceImpl.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MeterUtil.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalMetersBatchServiceImplTest.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/MeterUtilTest.java [new file with mode: 0644]

diff --git a/model/model-flow-service/src/main/yang/sal-meters-batch.yang b/model/model-flow-service/src/main/yang/sal-meters-batch.yang
new file mode 100644 (file)
index 0000000..a64a864
--- /dev/null
@@ -0,0 +1,97 @@
+module sal-meters-batch {
+    namespace "urn:opendaylight:meters:service";
+    prefix meters;
+
+    import barrier-common {prefix bc;revision-date "2016-03-15";}
+    import batch-common {prefix batch;revision-date "2016-03-22";}
+    import opendaylight-inventory {prefix inv;revision-date "2013-08-19";}
+    import opendaylight-meter-types {prefix meter-type;revision-date "2013-09-18";}
+
+    description "Openflow batch meter management.";
+
+    revision "2016-03-16" {
+        description "Initial revision of meter batch service";
+    }
+
+    grouping batch-meter-input-update-grouping {
+        description "Update openflow meter structure suitable for batch rpc input.";
+
+        // meter-id is included in meter-type:meter
+        container original-batched-meter {
+            uses meter-type:meter;
+        }
+        container updated-batched-meter {
+            uses meter-type:meter;
+        }
+    }
+
+    grouping batch-meter-output-list-grouping {
+        description "Openflow meter list suitable for batch rpc output.";
+
+         list batch-failed-meters-output {
+            key batch-order;
+
+            uses batch:batch-order-grouping;
+            leaf meter-id {
+                type meter-type:meter-id;
+            }
+         }
+    }
+
+    rpc add-meters-batch {
+        description "Adding batch meters to openflow device.";
+        input {
+            uses "inv:node-context-ref";
+
+            list batch-add-meters {
+                key meter-id;
+
+                leaf meter-ref {
+                    type meter-type:meter-ref;
+                }
+                uses meter-type:meter;
+            }
+            uses bc:barrier-suffix;
+        }
+        output {
+            uses batch-meter-output-list-grouping;
+        }
+    }
+
+    rpc remove-meters-batch {
+        description "Removing batch meter from openflow device.";
+        input {
+            uses "inv:node-context-ref";
+
+            list batch-remove-meters {
+                key meter-id;
+
+                leaf meter-ref {
+                    type meter-type:meter-ref;
+                }
+                uses meter-type:meter;
+            }
+            uses bc:barrier-suffix;
+        }
+        output {
+            uses batch-meter-output-list-grouping;
+        }
+    }
+
+    rpc update-meters-batch {
+        description "Updating batch meter on openflow device.";
+        input {
+            uses "inv:node-context-ref";
+            list batch-update-meters {
+                leaf meter-ref {
+                    type meter-type:meter-ref;
+                }
+                uses batch-meter-input-update-grouping;
+            }
+            uses bc:barrier-suffix;
+        }
+        output {
+            uses batch-meter-output-list-grouping;
+        }
+    }
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalMetersBatchServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalMetersBatchServiceImpl.java
new file mode 100644 (file)
index 0000000..73fde2e
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.services;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.opendaylight.openflowplugin.impl.util.BarrierUtil;
+import org.opendaylight.openflowplugin.impl.util.MeterUtil;
+import org.opendaylight.openflowplugin.impl.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.OriginalMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.UpdatedMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.SalMetersBatchService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.add.meters.batch.input.BatchAddMeters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.output.list.grouping.BatchFailedMetersOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.remove.meters.batch.input.BatchRemoveMeters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.update.meters.batch.input.BatchUpdateMeters;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * default implementation of {@link SalMetersBatchService} - delegates work to {@link SalMeterService}
+ */
+public class SalMetersBatchServiceImpl implements SalMetersBatchService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SalMetersBatchServiceImpl.class);
+
+    private final SalMeterService salMeterService;
+    private final FlowCapableTransactionService transactionService;
+
+    public SalMetersBatchServiceImpl(final SalMeterService salMeterService, final FlowCapableTransactionService transactionService) {
+        this.salMeterService = Preconditions.checkNotNull(salMeterService);
+        this.transactionService = Preconditions.checkNotNull(transactionService);
+    }
+
+    @Override
+    public Future<RpcResult<UpdateMetersBatchOutput>> updateMetersBatch(final UpdateMetersBatchInput input) {
+        final List<BatchUpdateMeters> batchUpdateMeters = input.getBatchUpdateMeters();
+        LOG.trace("Updating meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), batchUpdateMeters.size());
+
+        final ArrayList<ListenableFuture<RpcResult<UpdateMeterOutput>>> resultsLot = new ArrayList<>();
+        for (BatchUpdateMeters batchMeter : batchUpdateMeters) {
+            final UpdateMeterInput updateMeterInput = new UpdateMeterInputBuilder(input)
+                    .setOriginalMeter(new OriginalMeterBuilder(batchMeter.getOriginalBatchedMeter()).build())
+                    .setUpdatedMeter(new UpdatedMeterBuilder(batchMeter.getUpdatedBatchedMeter()).build())
+                    .setMeterRef(createMeterRef(input.getNode(), batchMeter))
+                    .setNode(input.getNode())
+                    .build();
+            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.updateMeter(updateMeterInput)));
+        }
+
+        final Iterable<Meter> meters = Iterables.transform(batchUpdateMeters, new Function<BatchUpdateMeters, Meter>() {
+                    @Nullable
+                    @Override
+                    public Meter apply(@Nullable final BatchUpdateMeters input) {
+                        return input.getUpdatedBatchedMeter();
+                    }
+                }
+        );
+
+        final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
+                Futures.transform(Futures.allAsList(resultsLot), MeterUtil.<UpdateMeterOutput>createCumulativeFunction(
+                        meters, batchUpdateMeters.size()));
+
+        ListenableFuture<RpcResult<UpdateMetersBatchOutput>> updateMetersBulkFuture =
+                Futures.transform(commonResult, MeterUtil.METER_UPDATE_TRANSFORM);
+
+        if (input.isBarrierAfter()) {
+            updateMetersBulkFuture = BarrierUtil.chainBarrier(updateMetersBulkFuture, input.getNode(),
+                    transactionService, MeterUtil.METER_UPDATE_COMPOSING_TRANSFORM);
+        }
+
+        return updateMetersBulkFuture;
+    }
+
+    @Override
+    public Future<RpcResult<AddMetersBatchOutput>> addMetersBatch(final AddMetersBatchInput input) {
+        LOG.trace("Adding meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchAddMeters().size());
+        final ArrayList<ListenableFuture<RpcResult<AddMeterOutput>>> resultsLot = new ArrayList<>();
+        for (BatchAddMeters addMeter : input.getBatchAddMeters()) {
+            final AddMeterInput addMeterInput = new AddMeterInputBuilder(addMeter)
+                    .setMeterRef(createMeterRef(input.getNode(), addMeter))
+                    .setNode(input.getNode())
+                    .build();
+            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.addMeter(addMeterInput)));
+        }
+
+        final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
+                Futures.transform(Futures.allAsList(resultsLot),
+                        MeterUtil.<AddMeterOutput>createCumulativeFunction(input.getBatchAddMeters()));
+
+        ListenableFuture<RpcResult<AddMetersBatchOutput>> addMetersBulkFuture =
+                Futures.transform(commonResult, MeterUtil.METER_ADD_TRANSFORM);
+
+        if (input.isBarrierAfter()) {
+            addMetersBulkFuture = BarrierUtil.chainBarrier(addMetersBulkFuture, input.getNode(),
+                    transactionService, MeterUtil.METER_ADD_COMPOSING_TRANSFORM);
+        }
+
+        return addMetersBulkFuture;
+    }
+
+    @Override
+    public Future<RpcResult<RemoveMetersBatchOutput>> removeMetersBatch(final RemoveMetersBatchInput input) {
+        LOG.trace("Removing meters @ {} : {}", PathUtil.extractNodeId(input.getNode()), input.getBatchRemoveMeters().size());
+        final ArrayList<ListenableFuture<RpcResult<RemoveMeterOutput>>> resultsLot = new ArrayList<>();
+        for (BatchRemoveMeters addMeter : input.getBatchRemoveMeters()) {
+            final RemoveMeterInput removeMeterInput = new RemoveMeterInputBuilder(addMeter)
+                    .setMeterRef(createMeterRef(input.getNode(), addMeter))
+                    .setNode(input.getNode())
+                    .build();
+            resultsLot.add(JdkFutureAdapters.listenInPoolThread(salMeterService.removeMeter(removeMeterInput)));
+        }
+
+        final ListenableFuture<RpcResult<List<BatchFailedMetersOutput>>> commonResult =
+                Futures.transform(Futures.allAsList(resultsLot),
+                        MeterUtil.<RemoveMeterOutput>createCumulativeFunction(input.getBatchRemoveMeters()));
+
+        ListenableFuture<RpcResult<RemoveMetersBatchOutput>> removeMetersBulkFuture =
+                Futures.transform(commonResult, MeterUtil.METER_REMOVE_TRANSFORM);
+
+        if (input.isBarrierAfter()) {
+            removeMetersBulkFuture = BarrierUtil.chainBarrier(removeMetersBulkFuture, input.getNode(),
+                    transactionService, MeterUtil.METER_REMOVE_COMPOSING_TRANSFORM);
+        }
+
+        return removeMetersBulkFuture;
+    }
+
+    private static MeterRef createMeterRef(final NodeRef nodeRef, final Meter batchMeter) {
+        return MeterUtil.buildMeterPath((InstanceIdentifier<Node>) nodeRef.getValue(), batchMeter.getMeterId());
+    }
+
+    private static MeterRef createMeterRef(final NodeRef nodeRef, final BatchUpdateMeters batchMeter) {
+        return MeterUtil.buildMeterPath((InstanceIdentifier<Node>) nodeRef.getValue(),
+                batchMeter.getUpdatedBatchedMeter().getMeterId());
+    }
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MeterUtil.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MeterUtil.java
new file mode 100644 (file)
index 0000000..d224c70
--- /dev/null
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.BatchMeterOutputListGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.output.list.grouping.BatchFailedMetersOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.output.list.grouping.BatchFailedMetersOutputBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * provides meter util methods
+ */
+public final class MeterUtil {
+
+    private static final RpcResultBuilder<List<BatchFailedMetersOutput>> SUCCESSFUL_METER_OUTPUT_RPC_RESULT =
+            RpcResultBuilder.success(Collections.<BatchFailedMetersOutput>emptyList());
+
+    public static final Function<RpcResult<List<BatchFailedMetersOutput>>, RpcResult<AddMetersBatchOutput>> METER_ADD_TRANSFORM =
+            new Function<RpcResult<List<BatchFailedMetersOutput>>, RpcResult<AddMetersBatchOutput>>() {
+                @Nullable
+                @Override
+                public RpcResult<AddMetersBatchOutput> apply(@Nullable final RpcResult<List<BatchFailedMetersOutput>> batchMetersCumulatedResult) {
+                    final AddMetersBatchOutput batchOutput = new AddMetersBatchOutputBuilder()
+                            .setBatchFailedMetersOutput(batchMetersCumulatedResult.getResult()).build();
+
+                    final RpcResultBuilder<AddMetersBatchOutput> resultBld =
+                            createCumulativeRpcResult(batchMetersCumulatedResult, batchOutput);
+                    return resultBld.build();
+                }
+            };
+    public static final Function<Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>>, RpcResult<AddMetersBatchOutput>>
+            METER_ADD_COMPOSING_TRANSFORM = createComposingFunction();
+
+    public static final Function<RpcResult<List<BatchFailedMetersOutput>>, RpcResult<RemoveMetersBatchOutput>> METER_REMOVE_TRANSFORM =
+            new Function<RpcResult<List<BatchFailedMetersOutput>>, RpcResult<RemoveMetersBatchOutput>>() {
+                @Nullable
+                @Override
+                public RpcResult<RemoveMetersBatchOutput> apply(@Nullable final RpcResult<List<BatchFailedMetersOutput>> batchMetersCumulatedResult) {
+                    final RemoveMetersBatchOutput batchOutput = new RemoveMetersBatchOutputBuilder()
+                            .setBatchFailedMetersOutput(batchMetersCumulatedResult.getResult()).build();
+
+                    final RpcResultBuilder<RemoveMetersBatchOutput> resultBld =
+                            createCumulativeRpcResult(batchMetersCumulatedResult, batchOutput);
+                    return resultBld.build();
+                }
+            };
+    public static final Function<Pair<RpcResult<RemoveMetersBatchOutput>, RpcResult<Void>>, RpcResult<RemoveMetersBatchOutput>>
+            METER_REMOVE_COMPOSING_TRANSFORM = createComposingFunction();
+
+    public static final Function<RpcResult<List<BatchFailedMetersOutput>>, RpcResult<UpdateMetersBatchOutput>> METER_UPDATE_TRANSFORM =
+            new Function<RpcResult<List<BatchFailedMetersOutput>>, RpcResult<UpdateMetersBatchOutput>>() {
+                @Nullable
+                @Override
+                public RpcResult<UpdateMetersBatchOutput> apply(@Nullable final RpcResult<List<BatchFailedMetersOutput>> batchMetersCumulatedResult) {
+                    final UpdateMetersBatchOutput batchOutput = new UpdateMetersBatchOutputBuilder()
+                            .setBatchFailedMetersOutput(batchMetersCumulatedResult.getResult()).build();
+
+                    final RpcResultBuilder<UpdateMetersBatchOutput> resultBld =
+                            createCumulativeRpcResult(batchMetersCumulatedResult, batchOutput);
+                    return resultBld.build();
+                }
+            };
+    public static final Function<Pair<RpcResult<UpdateMetersBatchOutput>, RpcResult<Void>>, RpcResult<UpdateMetersBatchOutput>>
+            METER_UPDATE_COMPOSING_TRANSFORM = createComposingFunction();
+
+    private MeterUtil() {
+        throw new IllegalStateException("This class should not be instantiated.");
+    }
+
+    /**
+     * @param nodePath
+     * @param meterId
+     * @return instance identifier assembled for given node and meter
+     */
+    public static MeterRef buildMeterPath(final InstanceIdentifier<Node> nodePath, final MeterId meterId) {
+        final KeyedInstanceIdentifier<Meter, MeterKey> meterPath = nodePath
+                .augmentation(FlowCapableNode.class)
+                .child(Meter.class, new MeterKey(meterId));
+
+        return new MeterRef(meterPath);
+    }
+
+    public static <O> Function<List<RpcResult<O>>, RpcResult<List<BatchFailedMetersOutput>>> createCumulativeFunction(
+            final Iterable<? extends org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter> inputBatchMeters) {
+        return createCumulativeFunction(inputBatchMeters, Iterables.size(inputBatchMeters));
+    }
+
+    public static <O> Function<List<RpcResult<O>>, RpcResult<List<BatchFailedMetersOutput>>> createCumulativeFunction(
+            final Iterable<? extends org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter> inputBatchMeters,
+            final int sizeOfInputBatch) {
+        return new Function<List<RpcResult<O>>, RpcResult<List<BatchFailedMetersOutput>>>() {
+            @Nullable
+            @Override
+            public RpcResult<List<BatchFailedMetersOutput>> apply(@Nullable final List<RpcResult<O>> innerInput) {
+                final int sizeOfFutures = innerInput.size();
+                Preconditions.checkArgument(sizeOfFutures == sizeOfInputBatch,
+                        "wrong amount of returned futures: {} <> {}", sizeOfFutures, sizeOfInputBatch);
+
+                final List<BatchFailedMetersOutput> batchMeters = new ArrayList<>();
+                final Iterator<? extends org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter>
+                        batchMeterIterator = inputBatchMeters.iterator();
+
+                Collection<RpcError> meterErrors = new ArrayList<>(sizeOfFutures);
+
+                int batchOrder = 0;
+                for (RpcResult<O> meterModOutput : innerInput) {
+                    final MeterId meterId = batchMeterIterator.next().getMeterId();
+
+                    if (!meterModOutput.isSuccessful()) {
+                        batchMeters.add(new BatchFailedMetersOutputBuilder()
+                                .setBatchOrder(batchOrder)
+                                .setMeterId(meterId)
+                                .build());
+                        meterErrors.addAll(meterModOutput.getErrors());
+                    }
+                    batchOrder++;
+                }
+
+                final RpcResultBuilder<List<BatchFailedMetersOutput>> resultBuilder;
+                if (!meterErrors.isEmpty()) {
+                    resultBuilder = RpcResultBuilder.<List<BatchFailedMetersOutput>>failed()
+                            .withRpcErrors(meterErrors).withResult(batchMeters);
+                } else {
+                    resultBuilder = SUCCESSFUL_METER_OUTPUT_RPC_RESULT;
+                }
+                return resultBuilder.build();
+            }
+        };
+    }
+
+    /**
+     * Factory method: create {@link Function} which attaches barrier response to given {@link RpcResult}&lt;T&gt;
+     * and changes success flag if needed.
+     * <br>
+     * Original rpcResult is the {@link Pair#getLeft()} and barrier result is the {@link Pair#getRight()}.
+     *
+     * @param <T> type of rpcResult value
+     * @return reusable static function
+     */
+    @VisibleForTesting
+    static <T extends BatchMeterOutputListGrouping>
+    Function<Pair<RpcResult<T>, RpcResult<Void>>, RpcResult<T>> createComposingFunction() {
+        return new Function<Pair<RpcResult<T>, RpcResult<Void>>, RpcResult<T>>() {
+            @Nullable
+            @Override
+            public RpcResult<T> apply(@Nullable final Pair<RpcResult<T>, RpcResult<Void>> input) {
+                final RpcResultBuilder<T> resultBld;
+                if (input.getLeft().isSuccessful() && input.getRight().isSuccessful()) {
+                    resultBld = RpcResultBuilder.success();
+                } else {
+                    resultBld = RpcResultBuilder.failed();
+                }
+
+                final ArrayList<RpcError> rpcErrors = new ArrayList<>(input.getLeft().getErrors());
+                rpcErrors.addAll(input.getRight().getErrors());
+                resultBld.withRpcErrors(rpcErrors);
+
+                resultBld.withResult(input.getLeft().getResult());
+
+                return resultBld.build();
+            }
+        };
+    }
+
+    /**
+     * Wrap given list of problematic group-ids into {@link RpcResult} of given type.
+     *
+     * @param batchMetersCumulativeResult list of ids failed groups
+     * @param batchOutput
+     * @param <T>                         group operation type
+     * @return batch group operation output of given type containing list of group-ids and corresponding success flag
+     */
+    private static <T extends BatchMeterOutputListGrouping>
+    RpcResultBuilder<T> createCumulativeRpcResult(final @Nullable RpcResult<List<BatchFailedMetersOutput>> batchMetersCumulativeResult,
+                                                  final T batchOutput) {
+        final RpcResultBuilder<T> resultBld;
+        if (batchMetersCumulativeResult.isSuccessful()) {
+            resultBld = RpcResultBuilder.success(batchOutput);
+        } else {
+            resultBld = RpcResultBuilder.failed();
+            resultBld.withResult(batchOutput)
+                    .withRpcErrors(batchMetersCumulativeResult.getErrors());
+        }
+        return resultBld;
+    }
+}
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalMetersBatchServiceImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalMetersBatchServiceImplTest.java
new file mode 100644 (file)
index 0000000..6873615
--- /dev/null
@@ -0,0 +1,313 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.services;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.Future;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.RemoveMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.UpdateMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.add.meters.batch.input.BatchAddMeters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.add.meters.batch.input.BatchAddMetersBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.remove.meters.batch.input.BatchRemoveMeters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.remove.meters.batch.input.BatchRemoveMetersBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.update.meters.batch.input.BatchUpdateMeters;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.update.meters.batch.input.BatchUpdateMetersBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * Test for {@link SalMetersBatchServiceImpl}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SalMetersBatchServiceImplTest {
+
+    public static final NodeId NODE_ID = new NodeId("ut-dummy-node");
+    public static final NodeKey NODE_KEY = new NodeKey(NODE_ID);
+    public static final NodeRef NODE_REF = new NodeRef(InstanceIdentifier.create(Nodes.class).child(Node.class, NODE_KEY));
+
+    @Mock
+    private SalMeterService salMeterService;
+    @Mock
+    private FlowCapableTransactionService transactionService;
+    @Captor
+    private ArgumentCaptor<RemoveMeterInput> removeMeterInputCpt;
+    @Captor
+    private ArgumentCaptor<UpdateMeterInput> updateMeterInputCpt;
+    @Captor
+    private ArgumentCaptor<AddMeterInput> addMeterInputCpt;
+
+    private SalMetersBatchServiceImpl salMetersBatchService;
+
+    @Before
+    public void setUp() throws Exception {
+        salMetersBatchService = new SalMetersBatchServiceImpl(salMeterService, transactionService);
+
+        Mockito.when(transactionService.sendBarrier(Matchers.<SendBarrierInput>any()))
+                .thenReturn(RpcResultBuilder.<Void>success().buildFuture());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Mockito.verifyNoMoreInteractions(salMeterService, transactionService);
+    }
+
+    @Test
+    public void testUpdateMetersBatch_success() throws Exception {
+        Mockito.when(salMeterService.updateMeter(Mockito.<UpdateMeterInput>any()))
+                .thenReturn(RpcResultBuilder.success(new UpdateMeterOutputBuilder().build()).buildFuture());
+
+        final UpdateMetersBatchInput input = new UpdateMetersBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchUpdateMeters(Lists.newArrayList(
+                        createEmptyBatchUpdateMeter(42L),
+                        createEmptyBatchUpdateMeter(44L)))
+                .build();
+
+        final Future<RpcResult<UpdateMetersBatchOutput>> resultFuture = salMetersBatchService.updateMetersBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertTrue(resultFuture.get().isSuccessful());
+
+        final InOrder inOrder = Mockito.inOrder(salMeterService, transactionService);
+        inOrder.verify(salMeterService, Mockito.times(2)).updateMeter(updateMeterInputCpt.capture());
+        final List<UpdateMeterInput> allValues = updateMeterInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getOriginalMeter().getMeterId().getValue().longValue());
+        Assert.assertEquals(43, allValues.get(0).getUpdatedMeter().getMeterId().getValue().longValue());
+        Assert.assertEquals(44, allValues.get(1).getOriginalMeter().getMeterId().getValue().longValue());
+        Assert.assertEquals(45, allValues.get(1).getUpdatedMeter().getMeterId().getValue().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testUpdateMetersBatch_failure() throws Exception {
+        Mockito.when(salMeterService.updateMeter(Mockito.<UpdateMeterInput>any()))
+                .thenReturn(RpcResultBuilder.<UpdateMeterOutput>failed()
+                        .withError(RpcError.ErrorType.APPLICATION, "ur-groupUpdateError")
+                        .buildFuture());
+
+        final UpdateMetersBatchInput input = new UpdateMetersBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchUpdateMeters(Lists.newArrayList(
+                        createEmptyBatchUpdateMeter(42L),
+                        createEmptyBatchUpdateMeter(44L)))
+                .build();
+
+        final Future<RpcResult<UpdateMetersBatchOutput>> resultFuture = salMetersBatchService.updateMetersBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertFalse(resultFuture.get().isSuccessful());
+        Assert.assertEquals(2, resultFuture.get().getResult().getBatchFailedMetersOutput().size());
+        Assert.assertEquals(43L, resultFuture.get().getResult().getBatchFailedMetersOutput().get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(45L, resultFuture.get().getResult().getBatchFailedMetersOutput().get(1).getMeterId().getValue().longValue());
+        Assert.assertEquals(2, resultFuture.get().getErrors().size());
+
+
+        final InOrder inOrder = Mockito.inOrder(salMeterService, transactionService);
+        inOrder.verify(salMeterService, Mockito.times(2)).updateMeter(updateMeterInputCpt.capture());
+        final List<UpdateMeterInput> allValues = updateMeterInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42, allValues.get(0).getOriginalMeter().getMeterId().getValue().longValue());
+        Assert.assertEquals(43, allValues.get(0).getUpdatedMeter().getMeterId().getValue().longValue());
+        Assert.assertEquals(44, allValues.get(1).getOriginalMeter().getMeterId().getValue().longValue());
+        Assert.assertEquals(45, allValues.get(1).getUpdatedMeter().getMeterId().getValue().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+
+    @Test
+    public void testAddMetersBatch_success() throws Exception {
+        Mockito.when(salMeterService.addMeter(Mockito.<AddMeterInput>any()))
+                .thenReturn(RpcResultBuilder.success(new AddMeterOutputBuilder().build()).buildFuture());
+
+        final AddMetersBatchInput input = new AddMetersBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchAddMeters(Lists.newArrayList(
+                        createEmptyBatchAddMeter(42L),
+                        createEmptyBatchAddMeter(43L)))
+                .build();
+
+        final Future<RpcResult<AddMetersBatchOutput>> resultFuture = salMetersBatchService.addMetersBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertTrue(resultFuture.get().isSuccessful());
+
+        final InOrder inOrder = Mockito.inOrder(salMeterService, transactionService);
+        inOrder.verify(salMeterService, Mockito.times(2)).addMeter(addMeterInputCpt.capture());
+        final List<AddMeterInput> allValues = addMeterInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42L, allValues.get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(43L, allValues.get(1).getMeterId().getValue().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testAddMetersBatch_failure() throws Exception {
+        Mockito.when(salMeterService.addMeter(Mockito.<AddMeterInput>any()))
+                .thenReturn(RpcResultBuilder.<AddMeterOutput>failed().withError(RpcError.ErrorType.APPLICATION, "ut-groupAddError")
+                        .buildFuture());
+
+        final AddMetersBatchInput input = new AddMetersBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchAddMeters(Lists.newArrayList(
+                        createEmptyBatchAddMeter(42L),
+                        createEmptyBatchAddMeter(43L)))
+                .build();
+
+        final Future<RpcResult<AddMetersBatchOutput>> resultFuture = salMetersBatchService.addMetersBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertFalse(resultFuture.get().isSuccessful());
+        Assert.assertEquals(2, resultFuture.get().getResult().getBatchFailedMetersOutput().size());
+        Assert.assertEquals(42L, resultFuture.get().getResult().getBatchFailedMetersOutput().get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(43L, resultFuture.get().getResult().getBatchFailedMetersOutput().get(1).getMeterId().getValue().longValue());
+        Assert.assertEquals(2, resultFuture.get().getErrors().size());
+
+
+        final InOrder inOrder = Mockito.inOrder(salMeterService, transactionService);
+        inOrder.verify(salMeterService, Mockito.times(2)).addMeter(addMeterInputCpt.capture());
+        final List<AddMeterInput> allValues = addMeterInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42L, allValues.get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(43L, allValues.get(1).getMeterId().getValue().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testRemoveMetersBatch_success() throws Exception {
+        Mockito.when(salMeterService.removeMeter(Mockito.<RemoveMeterInput>any()))
+                .thenReturn(RpcResultBuilder.success(new RemoveMeterOutputBuilder().build()).buildFuture());
+
+        final RemoveMetersBatchInput input = new RemoveMetersBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchRemoveMeters(Lists.newArrayList(
+                        createEmptyBatchRemoveMeter(42L),
+                        createEmptyBatchRemoveMeter(43L)))
+                .build();
+
+        final Future<RpcResult<RemoveMetersBatchOutput>> resultFuture = salMetersBatchService.removeMetersBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertTrue(resultFuture.get().isSuccessful());
+
+        final InOrder inOrder = Mockito.inOrder(salMeterService, transactionService);
+
+        inOrder.verify(salMeterService, Mockito.times(2)).removeMeter(removeMeterInputCpt.capture());
+        final List<RemoveMeterInput> allValues = removeMeterInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42L, allValues.get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(43L, allValues.get(1).getMeterId().getValue().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    @Test
+    public void testRemoveMetersBatch_failure() throws Exception {
+        Mockito.when(salMeterService.removeMeter(Mockito.<RemoveMeterInput>any()))
+                .thenReturn(RpcResultBuilder.<RemoveMeterOutput>failed().withError(RpcError.ErrorType.APPLICATION, "ut-groupRemoveError")
+                        .buildFuture());
+
+        final RemoveMetersBatchInput input = new RemoveMetersBatchInputBuilder()
+                .setNode(NODE_REF)
+                .setBarrierAfter(true)
+                .setBatchRemoveMeters(Lists.newArrayList(
+                        createEmptyBatchRemoveMeter(42L),
+                        createEmptyBatchRemoveMeter(43L)))
+                .build();
+
+        final Future<RpcResult<RemoveMetersBatchOutput>> resultFuture = salMetersBatchService.removeMetersBatch(input);
+
+        Assert.assertTrue(resultFuture.isDone());
+        Assert.assertFalse(resultFuture.get().isSuccessful());
+        Assert.assertEquals(2, resultFuture.get().getResult().getBatchFailedMetersOutput().size());
+        Assert.assertEquals(42L, resultFuture.get().getResult().getBatchFailedMetersOutput().get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(43L, resultFuture.get().getResult().getBatchFailedMetersOutput().get(1).getMeterId().getValue().longValue());
+        Assert.assertEquals(2, resultFuture.get().getErrors().size());
+
+        final InOrder inOrder = Mockito.inOrder(salMeterService, transactionService);
+
+        inOrder.verify(salMeterService, Mockito.times(2)).removeMeter(removeMeterInputCpt.capture());
+        final List<RemoveMeterInput> allValues = removeMeterInputCpt.getAllValues();
+        Assert.assertEquals(2, allValues.size());
+        Assert.assertEquals(42L, allValues.get(0).getMeterId().getValue().longValue());
+        Assert.assertEquals(43L, allValues.get(1).getMeterId().getValue().longValue());
+
+        inOrder.verify(transactionService).sendBarrier(Matchers.<SendBarrierInput>any());
+    }
+
+    private static BatchAddMeters createEmptyBatchAddMeter(final long groupIdValue) {
+        return new BatchAddMetersBuilder()
+                .setMeterId(new MeterId(groupIdValue))
+                .build();
+    }
+
+    private static BatchRemoveMeters createEmptyBatchRemoveMeter(final long groupIdValue) {
+        return new BatchRemoveMetersBuilder()
+                .setMeterId(new MeterId(groupIdValue))
+                .build();
+    }
+
+    private static BatchUpdateMeters createEmptyBatchUpdateMeter(final long groupIdValue) {
+        return new BatchUpdateMetersBuilder()
+                .setOriginalBatchedMeter(new OriginalBatchedMeterBuilder(createEmptyBatchAddMeter(groupIdValue)).build())
+                .setUpdatedBatchedMeter(new UpdatedBatchedMeterBuilder(createEmptyBatchAddMeter(groupIdValue + 1)).build())
+                .build();
+    }
+}
\ No newline at end of file
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/MeterUtilTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/util/MeterUtilTest.java
new file mode 100644 (file)
index 0000000..869f0bb
--- /dev/null
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.AddMetersBatchOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.BatchMeterOutputListGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.output.list.grouping.BatchFailedMetersOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.output.list.grouping.BatchFailedMetersOutputBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * Test for {@link MeterUtil}.
+ */
+public class MeterUtilTest {
+
+    public static final NodeId DUMMY_NODE_ID = new NodeId("dummyNodeId");
+    private static final MeterId DUMMY_METER_ID = new MeterId(42L);
+    private static final MeterId DUMMY_METER_ID_2 = new MeterId(43L);
+
+    @Test
+    public void testBuildGroupPath() throws Exception {
+        final InstanceIdentifier<Node> nodePath = InstanceIdentifier
+                .create(Nodes.class)
+                .child(Node.class, new NodeKey(DUMMY_NODE_ID));
+
+        final MeterRef meterRef = MeterUtil.buildMeterPath(nodePath, DUMMY_METER_ID);
+        final InstanceIdentifier<?> meterRefValue = meterRef.getValue();
+        Assert.assertEquals(DUMMY_NODE_ID, meterRefValue.firstKeyOf(Node.class).getId());
+        Assert.assertEquals(DUMMY_METER_ID, meterRefValue.firstKeyOf(Meter.class).getMeterId());
+    }
+
+    @Test
+    public void testCreateCumulatingFunction() throws Exception {
+        final Function<List<RpcResult<String>>, RpcResult<List<BatchFailedMetersOutput>>> function =
+                MeterUtil.createCumulativeFunction(Lists.newArrayList(
+                        createBatchMeter(DUMMY_METER_ID),
+                        createBatchMeter(DUMMY_METER_ID_2)));
+
+        final RpcResult<List<BatchFailedMetersOutput>> output = function.apply(Lists.newArrayList(
+                RpcResultBuilder.success("a").build(),
+                RpcResultBuilder.<String>failed()
+                        .withError(RpcError.ErrorType.APPLICATION, "ut-meter-error")
+                        .build()));
+
+        Assert.assertFalse(output.isSuccessful());
+        Assert.assertEquals(1, output.getResult().size());
+        Assert.assertEquals(DUMMY_METER_ID_2, output.getResult().get(0).getMeterId());
+        Assert.assertEquals(1, output.getResult().get(0).getBatchOrder().intValue());
+    }
+
+    private org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter createBatchMeter(final MeterId meterId) {
+        return new MeterBuilder()
+                .setMeterId(meterId)
+                .build();
+    }
+
+    @Test
+    public void testMETER_ADD_TRANSFORM__failure() throws Exception {
+        final RpcResult<List<BatchFailedMetersOutput>> input = createBatchOutcomeWithError();
+        checkBatchErrorOutcomeTransformation(MeterUtil.METER_ADD_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testMETER_ADD_TRANSFORM__success() throws Exception {
+        final RpcResult<List<BatchFailedMetersOutput>> input = createEmptyBatchOutcome();
+        checkBatchSuccessOutcomeTransformation(MeterUtil.METER_ADD_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testMETER_REMOVE_TRANSFORM__failure() throws Exception {
+        final RpcResult<List<BatchFailedMetersOutput>> input = createBatchOutcomeWithError();
+        checkBatchErrorOutcomeTransformation(MeterUtil.METER_REMOVE_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_REMOVE_TRANSFORM__success() throws Exception {
+        final RpcResult<List<BatchFailedMetersOutput>> input = createEmptyBatchOutcome();
+        checkBatchSuccessOutcomeTransformation(MeterUtil.METER_REMOVE_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_UPDATE_TRANSFORM__failure() throws Exception {
+        final RpcResult<List<BatchFailedMetersOutput>> input = createBatchOutcomeWithError();
+        checkBatchErrorOutcomeTransformation(MeterUtil.METER_UPDATE_TRANSFORM.apply(input));
+    }
+
+    @Test
+    public void testFLOW_UPDATE_TRANSFORM__success() throws Exception {
+        final RpcResult<List<BatchFailedMetersOutput>> input = createEmptyBatchOutcome();
+        checkBatchSuccessOutcomeTransformation(MeterUtil.METER_UPDATE_TRANSFORM.apply(input));
+    }
+
+    private <T extends BatchMeterOutputListGrouping> void checkBatchSuccessOutcomeTransformation(final RpcResult<T> output) {
+        Assert.assertTrue(output.isSuccessful());
+        Assert.assertEquals(0, output.getResult().getBatchFailedMetersOutput().size());
+        Assert.assertEquals(0, output.getErrors().size());
+    }
+
+    private RpcResult<List<BatchFailedMetersOutput>> createEmptyBatchOutcome() {
+        return RpcResultBuilder
+                .<List<BatchFailedMetersOutput>>success(Collections.<BatchFailedMetersOutput>emptyList())
+                .build();
+    }
+
+    private RpcResult<List<BatchFailedMetersOutput>> createBatchOutcomeWithError() {
+        return RpcResultBuilder.<List<BatchFailedMetersOutput>>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "ut-flowAddFail")
+                .withResult(Collections.singletonList(new BatchFailedMetersOutputBuilder()
+                        .setMeterId(DUMMY_METER_ID)
+                        .build()))
+                .build();
+    }
+
+    private <T extends BatchMeterOutputListGrouping> void checkBatchErrorOutcomeTransformation(final RpcResult<T> output) {
+        Assert.assertFalse(output.isSuccessful());
+        Assert.assertEquals(1, output.getResult().getBatchFailedMetersOutput().size());
+        Assert.assertEquals(DUMMY_METER_ID, output.getResult().getBatchFailedMetersOutput().get(0).getMeterId());
+
+        Assert.assertEquals(1, output.getErrors().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_success_success() throws Exception {
+        final Function<Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>>, RpcResult<AddMetersBatchOutput>> compositeFunction =
+                MeterUtil.createComposingFunction();
+
+        final RpcResult<AddMetersBatchOutput> addGroupBatchOutput = createAddMetersBatchSuccessOutput();
+        final RpcResult<Void> barrierOutput = RpcResultBuilder.<Void>success().build();
+        final Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>> input = Pair.of(addGroupBatchOutput, barrierOutput);
+        final RpcResult<AddMetersBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertTrue(composite.isSuccessful());
+        Assert.assertEquals(0, composite.getErrors().size());
+        Assert.assertEquals(0, composite.getResult().getBatchFailedMetersOutput().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_failure_success() throws Exception {
+        final Function<Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>>, RpcResult<AddMetersBatchOutput>> compositeFunction =
+                MeterUtil.createComposingFunction();
+
+        final RpcResult<AddMetersBatchOutput> addGroupBatchOutput = createAddMetersBatchFailureOutcome();
+        final RpcResult<Void> barrierOutput = RpcResultBuilder.<Void>success().build();
+        final Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>> input = Pair.of(addGroupBatchOutput, barrierOutput);
+        final RpcResult<AddMetersBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertFalse(composite.isSuccessful());
+        Assert.assertEquals(1, composite.getErrors().size());
+        Assert.assertEquals(1, composite.getResult().getBatchFailedMetersOutput().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_success_failure() throws Exception {
+        final Function<Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>>, RpcResult<AddMetersBatchOutput>> compositeFunction =
+                MeterUtil.createComposingFunction();
+
+        final RpcResult<AddMetersBatchOutput> addGroupBatchOutput = createAddMetersBatchSuccessOutput();
+        final RpcResult<Void> barrierOutput = createBarrierFailureOutcome();
+        final Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>> input = Pair.of(addGroupBatchOutput, barrierOutput);
+        final RpcResult<AddMetersBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertFalse(composite.isSuccessful());
+        Assert.assertEquals(1, composite.getErrors().size());
+        Assert.assertEquals(0, composite.getResult().getBatchFailedMetersOutput().size());
+    }
+
+    @Test
+    public void testCreateComposingFunction_failure_failure() throws Exception {
+        final Function<Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>>, RpcResult<AddMetersBatchOutput>> compositeFunction =
+                MeterUtil.createComposingFunction();
+
+        final RpcResult<AddMetersBatchOutput> addGroupBatchOutput = createAddMetersBatchFailureOutcome();
+        final RpcResult<Void> barrierOutput = createBarrierFailureOutcome();
+        final Pair<RpcResult<AddMetersBatchOutput>, RpcResult<Void>> input = Pair.of(addGroupBatchOutput, barrierOutput);
+        final RpcResult<AddMetersBatchOutput> composite = compositeFunction.apply(input);
+
+        Assert.assertFalse(composite.isSuccessful());
+        Assert.assertEquals(2, composite.getErrors().size());
+        Assert.assertEquals(1, composite.getResult().getBatchFailedMetersOutput().size());
+    }
+
+    private RpcResult<Void> createBarrierFailureOutcome() {
+        return RpcResultBuilder.<Void>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "ut-barrier-error")
+                .build();
+    }
+
+    private RpcResult<AddMetersBatchOutput> createAddMetersBatchSuccessOutput() {
+        return RpcResultBuilder
+                .success(new AddMetersBatchOutputBuilder()
+                        .setBatchFailedMetersOutput(Collections.<BatchFailedMetersOutput>emptyList())
+                        .build())
+                .build();
+    }
+
+    private RpcResult<AddMetersBatchOutput> createAddMetersBatchFailureOutcome() {
+        final RpcResult<List<BatchFailedMetersOutput>> batchOutcomeWithError = createBatchOutcomeWithError();
+        return RpcResultBuilder.<AddMetersBatchOutput>failed()
+                .withResult(new AddMetersBatchOutputBuilder()
+                        .setBatchFailedMetersOutput(batchOutcomeWithError.getResult())
+                        .build())
+                .withRpcErrors(batchOutcomeWithError.getErrors())
+                .build();
+    }
+}
\ No newline at end of file