Merge "Bug-4957: Make async operational DS Read"
authormichal rehak <mirehak@cisco.com>
Wed, 2 Mar 2016 16:27:38 +0000 (16:27 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 2 Mar 2016 16:27:38 +0000 (16:27 +0000)
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallback.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallbackTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtilsTest.java

index be73b4908fa39b9f4a5aec12d78cf70912e2fa08..d0c282d0ab42896d54dfeec212584903234fd00a 100644 (file)
@@ -17,7 +17,7 @@ import java.util.List;
 import io.netty.util.Timeout;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -114,8 +114,9 @@ public interface DeviceContext extends AutoCloseable,
      * represented by this context. This read only transaction has a fresh dataStore snapshot.
      * There is a possibility to get different data set from  DataStore
      * as write transaction in this context.
+     * @return readOnlyTransaction - Don't forget to close it after finish reading
      */
-    ReadTransaction getReadTransaction();
+    ReadOnlyTransaction getReadTransaction();
 
 
     /**
index 5bd3dddc812e8bdbfa8850d53f6a2ea3e5de2b1a..c9a17b66ff0ad51d79760821b3823975eaf4db1e 100644 (file)
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
@@ -223,7 +223,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public ReadTransaction getReadTransaction() {
+    public ReadOnlyTransaction getReadTransaction() {
         return dataBroker.newReadOnlyTransaction();
     }
 
index ea325ea48d627d103a808afcb44957640a4d84c0..aacfa703c5744cec709595871d1e90d65deb577e 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.collect.Iterables;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -72,24 +74,33 @@ final class MultipartRequestOnTheFlyCallback extends AbstractRequestCallback<Lis
             setResult(rpcResultBuilder.build());
             endCollecting();
         } else {
-            MultipartReply multipartReply = (MultipartReply) result;
+            final MultipartReply multipartReply = (MultipartReply) result;
 
-            Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
             final MultipartReply singleReply = multipartReply;
             final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
-            allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+            final Iterable<? extends DataObject> allMultipartData = multipartDataList;
 
             //TODO: following part is focused on flow stats only - need more general approach if used for more than flow stats
+            ListenableFuture<Void> future;
             if (virgin) {
-                StatisticsGatheringUtils.deleteAllKnownFlows(deviceContext);
+                future = StatisticsGatheringUtils.deleteAllKnownFlows(deviceContext);
                 virgin = false;
+            } else {
+                future = Futures.immediateFuture(null);
             }
-            StatisticsGatheringUtils.writeFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
-            // ^^^^
 
-            if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
-                endCollecting();
-            }
+            Futures.transform(future, new Function<Void, Void>() {
+
+                @Override
+                public Void apply(final Void input) {
+                    StatisticsGatheringUtils.writeFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData,deviceContext);
+
+                    if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
+                        endCollecting();
+                    }
+                    return input;
+                }
+            });
         }
     }
 
index 4730e085bd6e552e7677e2ef29971aa7eaa22e4b..02e86f33c3b80525f28107f8b97cc8077da03e1b 100644 (file)
@@ -11,15 +11,16 @@ package org.opendaylight.openflowplugin.impl.statistics;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureFallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -92,7 +93,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,7 +120,7 @@ public final class StatisticsGatheringUtils {
             wholeProcessEventIdentifier = new EventIdentifier(type.toString(), deviceId);
             EventsTimeCounter.markStart(wholeProcessEventIdentifier);
         }
-        EventIdentifier ofpQueuToRequestContextEventIdentifier = new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceId);
+        final EventIdentifier ofpQueuToRequestContextEventIdentifier = new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceId);
         final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture =
                 JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(
                         ofpQueuToRequestContextEventIdentifier, type));
@@ -130,13 +130,13 @@ public final class StatisticsGatheringUtils {
     private static ListenableFuture<Boolean> transformAndStoreStatisticsData(final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture,
                                                                              final DeviceContext deviceContext,
                                                                              final EventIdentifier eventIdentifier, final MultipartType type) {
-        return Futures.transform(statisticsDataInFuture, new Function<RpcResult<List<MultipartReply>>, Boolean>() {
+        return Futures.transform(statisticsDataInFuture, new AsyncFunction<RpcResult<List<MultipartReply>>, Boolean>() {
             @Nullable
             @Override
-            public Boolean apply(final RpcResult<List<MultipartReply>> rpcResult) {
+            public ListenableFuture<Boolean> apply(final RpcResult<List<MultipartReply>> rpcResult) {
+                boolean isMultipartProcessed = Boolean.TRUE;
                 if (rpcResult.isSuccessful()) {
                     LOG.debug("Stats reply successfully received for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
-                    boolean isMultipartProcessed = Boolean.TRUE;
 
                     // TODO: in case the result value is null then multipart data probably got processed on the fly -
                     // TODO: this contract should by clearly stated and enforced - now simple true value is returned
@@ -151,10 +151,10 @@ public final class StatisticsGatheringUtils {
                                 multipartData = multipartDataList.get(0);
                                 allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
                             }
-                        } catch (Exception e) {
+                        } catch (final Exception e) {
                             LOG.warn("stats processing of type {} for node {} failed during transfomation step",
                                     type, deviceContext.getDeviceState().getNodeId(), e);
-                            throw e;
+                            return Futures.immediateFailedFuture(e);
                         }
 
 
@@ -170,8 +170,10 @@ public final class StatisticsGatheringUtils {
                             } else if (multipartData instanceof QueueStatisticsUpdate) {
                                 processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
                             } else if (multipartData instanceof FlowsStatisticsUpdate) {
-                                processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
-                                EventsTimeCounter.markEnd(eventIdentifier);
+                                /* FlowStat Processing is realized by NettyThread only by initPhase, otherwise it is realized
+                                 * by MD-SAL thread */
+                                return processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext, eventIdentifier);
+
                             } else if (multipartData instanceof GroupDescStatsUpdated) {
                                 processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
                             } else if (multipartData instanceof MeterConfigStatsUpdated) {
@@ -179,10 +181,10 @@ public final class StatisticsGatheringUtils {
                             } else {
                                 isMultipartProcessed = Boolean.FALSE;
                             }
-                        } catch (Exception e) {
+                        } catch (final Exception e) {
                             LOG.warn("stats processing of type {} for node {} failed during write-to-tx step",
                                     type, deviceContext.getDeviceState().getNodeId(), e);
-                            throw e;
+                            return Futures.immediateFailedFuture(e);
                         }
 
                         LOG.debug("Stats reply added to transaction for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
@@ -192,11 +194,11 @@ public final class StatisticsGatheringUtils {
                         LOG.debug("Stats reply was empty for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
                     }
 
-                    return isMultipartProcessed;
                 } else {
                     LOG.debug("Stats reply FAILED for node {} of type {}: {}", deviceContext.getDeviceState().getNodeId(), type, rpcResult.getErrors());
+                    isMultipartProcessed = Boolean.FALSE;
                 }
-                return Boolean.FALSE;
+                return Futures.immediateFuture(isMultipartProcessed);
             }
         });
     }
@@ -219,13 +221,22 @@ public final class StatisticsGatheringUtils {
         deviceContext.submitTransaction();
     }
 
-    private static void processFlowStatistics(final Iterable<FlowsStatisticsUpdate> data, final DeviceContext deviceContext) {
-        deleteAllKnownFlows(deviceContext);
-        writeFlowStatistics(data, deviceContext);
-        deviceContext.submitTransaction();
+    private static ListenableFuture<Boolean> processFlowStatistics(final Iterable<FlowsStatisticsUpdate> data,
+            final DeviceContext deviceContext, final EventIdentifier eventIdentifier) {
+        final ListenableFuture<Void> deleFuture = deleteAllKnownFlows(deviceContext);
+        return Futures.transform(deleFuture, new Function<Void, Boolean>() {
+
+            @Override
+            public Boolean apply(final Void input) {
+                writeFlowStatistics(data, deviceContext);
+                deviceContext.submitTransaction();
+                EventsTimeCounter.markEnd(eventIdentifier);
+                return Boolean.TRUE;
+            }
+        });
     }
 
-    public static void writeFlowStatistics(Iterable<FlowsStatisticsUpdate> data, DeviceContext deviceContext) {
+    public static void writeFlowStatistics(final Iterable<FlowsStatisticsUpdate> data, final DeviceContext deviceContext) {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceContext);
         for (final FlowsStatisticsUpdate flowsStatistics : data) {
             for (final FlowAndStatisticsMapList flowStat : flowsStatistics.getFlowAndStatisticsMapList()) {
@@ -257,29 +268,46 @@ public final class StatisticsGatheringUtils {
         return flowStatisticsDataBld;
     }
 
-    public static void deleteAllKnownFlows(final DeviceContext deviceContext) {
+    public static ListenableFuture<Void> deleteAllKnownFlows(final DeviceContext deviceContext) {
+        /* DeviceState.deviceSynchronized is a marker for actual phase - false means initPhase, true means noInitPhase */
         if (deviceContext.getDeviceState().deviceSynchronized()) {
-            InstanceIdentifier<FlowCapableNode> flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceContext);
-            final Short numOfTablesOnDevice = deviceContext.getDeviceState().getFeatures().getTables();
-            for (short i = 0; i < numOfTablesOnDevice; i++) {
-                final KeyedInstanceIdentifier<Table, TableKey> iiToTable = flowCapableNodePath.child(Table.class, new TableKey(i));
-                final ReadTransaction readTx = deviceContext.getReadTransaction();
-                final CheckedFuture<Optional<Table>, ReadFailedException> tableDataFuture = readTx.read(LogicalDatastoreType.OPERATIONAL, iiToTable);
-                try {
-                    final Optional<Table> tableDataOpt = tableDataFuture.get();
-                    if (tableDataOpt.isPresent()) {
-                        final Table tableData = tableDataOpt.get();
-                        final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
-                        deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+            final InstanceIdentifier<FlowCapableNode> flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceContext);
+            final ReadOnlyTransaction readTx = deviceContext.getReadTransaction();
+            final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowCapableNodeFuture = readTx.read(
+                    LogicalDatastoreType.OPERATIONAL, flowCapableNodePath);
+
+            /* we wish to close readTx for fallBack */
+            Futures.withFallback(flowCapableNodeFuture, new FutureFallback<Optional<FlowCapableNode>>() {
+
+                @Override
+                public ListenableFuture<Optional<FlowCapableNode>> create(final Throwable t) throws Exception {
+                    readTx.close();
+                    return Futures.immediateFailedFuture(t);
+                }
+            });
+            /*
+             * we have to read actual tables with all information before we set empty Flow list, merge is expensive and
+             * not applicable for lists
+             */
+            return Futures.transform(flowCapableNodeFuture, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+
+                @Override
+                public ListenableFuture<Void> apply(final Optional<FlowCapableNode> flowCapNodeOpt) throws Exception {
+                    if (flowCapNodeOpt.isPresent()) {
+                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+                            final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow> emptyList()).build();
+                            final InstanceIdentifier<Table> iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey());
+                            deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                        }
                     }
-                } catch (final InterruptedException e) {
-                    LOG.trace("Reading of table features for table wit ID {} was interrputed.", i);
-                } catch (final ExecutionException e) {
-                    LOG.trace("Reading of table features for table wit ID {} encountered execution exception {}.", i, e);
+                    deviceContext.getDeviceFlowRegistry().removeMarked();
+                    readTx.close();
+                    return Futures.immediateFuture(null);
                 }
-            }
-            deviceContext.getDeviceFlowRegistry().removeMarked();
+
+            });
         }
+        return Futures.immediateFuture(null);
     }
 
     private static void processQueueStatistics(final Iterable<QueueStatisticsUpdate> data, final DeviceContext deviceContext) {
index 3db9c1aca5e75fe07ce3e9ef4c3994da939ae41c..68d05e435aa9816b98414fc4ab6a3fa0dd3f973b 100644 (file)
@@ -3,9 +3,14 @@ package org.opendaylight.openflowplugin.impl.services;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.List;
@@ -17,7 +22,9 @@ import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -28,6 +35,11 @@ import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKe
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.MessageIntelligenceAgencyImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -79,10 +91,13 @@ public class MultipartRequestOnTheFlyCallbackTest {
     private GetFeaturesOutput mocketGetFeaturesOutput;
     @Mock
     private DeviceFlowRegistry mockedFlowRegistry;
+    @Mock
+    private ReadOnlyTransaction mockedReadOnlyTx;
 
     private AbstractRequestContext<List<MultipartReply>> dummyRequestContext;
-    private EventIdentifier dummyEventIdentifier = new EventIdentifier(DUMMY_EVENT_NAME, DUMMY_DEVICE_ID);
+    private final EventIdentifier dummyEventIdentifier = new EventIdentifier(DUMMY_EVENT_NAME, DUMMY_DEVICE_ID);
     private MultipartRequestOnTheFlyCallback multipartRequestOnTheFlyCallback;
+    private final short tableId = 0;
 
     @Before
     public void initialization() {
@@ -92,7 +107,9 @@ public class MultipartRequestOnTheFlyCallbackTest {
         when(mockedPrimaryConnection.getFeatures()).thenReturn(mockedFeaturesReply);
         when(mockedFeaturesReply.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
         when(mockedFeaturesReply.getDatapathId()).thenReturn(BigInteger.valueOf(123L));
-        when(mocketGetFeaturesOutput.getTables()).thenReturn((short) 0);
+
+        when(mocketGetFeaturesOutput.getTables()).thenReturn(tableId);
+
         when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedPrimaryConnection);
         when(mockedDeviceState.getNodeInstanceIdentifier()).thenReturn(NODE_PATH);
         when(mockedDeviceState.getFeatures()).thenReturn(mocketGetFeaturesOutput);
@@ -100,6 +117,14 @@ public class MultipartRequestOnTheFlyCallbackTest {
         when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
         when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(mockedFlowRegistry);
 
+        final InstanceIdentifier<FlowCapableNode> nodePath = mockedDeviceState.getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
+        final FlowCapableNodeBuilder flowNodeBuilder = new FlowCapableNodeBuilder();
+        flowNodeBuilder.setTable(Collections.<Table> emptyList());
+        final Optional<FlowCapableNode> flowNodeOpt = Optional.of(flowNodeBuilder.build());
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowNodeFuture = Futures.immediateCheckedFuture(flowNodeOpt);
+        when(mockedReadOnlyTx.read(LogicalDatastoreType.OPERATIONAL, nodePath)).thenReturn(flowNodeFuture);
+        when(mockedDeviceContext.getReadTransaction()).thenReturn(mockedReadOnlyTx);
+
         dummyRequestContext = new AbstractRequestContext<List<MultipartReply>>(DUMMY_XID) {
 
             @Override
@@ -123,7 +148,7 @@ public class MultipartRequestOnTheFlyCallbackTest {
 
     @Test
     public void testOnSuccessWithNotMultiNoMultipart() throws ExecutionException, InterruptedException {
-        HelloMessage mockedHelloMessage = mock(HelloMessage.class);
+        final HelloMessage mockedHelloMessage = mock(HelloMessage.class);
         multipartRequestOnTheFlyCallback.onSuccess(mockedHelloMessage);
 
         final RpcResult<List<MultipartReply>> expectedRpcResult =
@@ -155,7 +180,7 @@ public class MultipartRequestOnTheFlyCallbackTest {
         final MatchBuilder matchBuilder = new MatchBuilder()
                 .setMatchEntry(Collections.<MatchEntry>emptyList());
         final FlowStatsBuilder flowStatsBuilder = new FlowStatsBuilder()
-                .setTableId((short) 0)
+.setTableId(tableId)
                 .setPriority(2)
                 .setCookie(BigInteger.ZERO)
                 .setByteCount(BigInteger.TEN)
@@ -168,17 +193,38 @@ public class MultipartRequestOnTheFlyCallbackTest {
                 .setFlowStats(Collections.singletonList(flowStatsBuilder.build()));
         final MultipartReplyFlowCaseBuilder multipartReplyFlowCaseBuilder = new MultipartReplyFlowCaseBuilder()
                 .setMultipartReplyFlow(multipartReplyFlowBuilder.build());
-        MultipartReplyMessageBuilder mpReplyMessage = new MultipartReplyMessageBuilder()
+        final MultipartReplyMessageBuilder mpReplyMessage = new MultipartReplyMessageBuilder()
                 .setType(MultipartType.OFPMPFLOW)
                 .setFlags(new MultipartRequestFlags(true))
                 .setMultipartReplyBody(multipartReplyFlowCaseBuilder.build())
                 .setXid(21L);
 
-        multipartRequestOnTheFlyCallback.onSuccess(mpReplyMessage.build());
+        final InstanceIdentifier<FlowCapableNode> nodePath = mockedDeviceState.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
+        final FlowCapableNodeBuilder flowNodeBuilder = new FlowCapableNodeBuilder();
+        final TableBuilder tableDataBld = new TableBuilder();
+        tableDataBld.setId(tableId);
+        flowNodeBuilder.setTable(Collections.singletonList(tableDataBld.build()));
+        final Optional<FlowCapableNode> flowNodeOpt = Optional.of(flowNodeBuilder.build());
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowNodeFuture = Futures
+                .immediateCheckedFuture(flowNodeOpt);
+        when(mockedReadOnlyTx.read(LogicalDatastoreType.OPERATIONAL, nodePath)).thenReturn(flowNodeFuture);
+        when(mockedDeviceContext.getReadTransaction()).thenReturn(mockedReadOnlyTx);
 
-        Mockito.verify(mockedFlowRegistry).storeIfNecessary(Matchers.<FlowRegistryKey>any(), Matchers.anyShort());
-        Mockito.verify(mockedDeviceContext).writeToTransaction(Matchers.eq(LogicalDatastoreType.OPERATIONAL),
-                Matchers.<InstanceIdentifier>any(), Matchers.<DataObject>any());
+        multipartRequestOnTheFlyCallback.onSuccess(mpReplyMessage.build());
+        final InstanceIdentifier<Table> tableIdent = nodePath.child(Table.class, new TableKey(tableId));
+
+        verify(mockedReadOnlyTx, times(1)).read(LogicalDatastoreType.OPERATIONAL, nodePath);
+        verify(mockedReadOnlyTx, times(1)).close();
+        verify(mockedFlowRegistry).storeIfNecessary(Matchers.<FlowRegistryKey> any(), Matchers.anyShort());
+        verify(mockedDeviceContext, times(1)).writeToTransaction(eq(LogicalDatastoreType.OPERATIONAL),
+                eq(tableIdent), Matchers.<Table> any());
+        /*
+         * One call for Table one call for Flow
+         * we are not able to create Flow InstanceIdentifier because we are missing FlowId
+         */
+        verify(mockedDeviceContext, times(2)).writeToTransaction(eq(LogicalDatastoreType.OPERATIONAL),
+                Matchers.<InstanceIdentifier> any(), Matchers.<DataObject> any());
     }
 
     /**
@@ -189,7 +235,7 @@ public class MultipartRequestOnTheFlyCallbackTest {
      */
     @Test
     public void testOnSuccessWithValidMultipart2() throws ExecutionException, InterruptedException {
-        MultipartReplyMessageBuilder mpReplyMessage = new MultipartReplyMessageBuilder()
+        final MultipartReplyMessageBuilder mpReplyMessage = new MultipartReplyMessageBuilder()
                 .setType(MultipartType.OFPMPDESC)
                 .setFlags(new MultipartRequestFlags(false));
 
index c0871b26a8864ba778b1cf73585e302a02a78987..4c7aebdcb923c39ce1441ca4af6f6deb7938fcd0 100644 (file)
@@ -13,7 +13,6 @@ package org.opendaylight.openflowplugin.impl.statistics;
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -35,7 +34,7 @@ import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.openflowplugin.api.OFConstants;
@@ -51,6 +50,7 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Stati
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.openflow.md.util.OpenflowPortsUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
@@ -128,7 +128,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table._case.multipart.reply.table.TableStatsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -155,7 +154,7 @@ public class StatisticsGatheringUtilsTest {
     @Mock
     private GetFeaturesOutput features;
     @Mock
-    private ReadTransaction readTx;
+    private ReadOnlyTransaction readTx;
     @Mock
     private ConnectionContext connectionAdapter;
     @Mock
@@ -190,26 +189,26 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testWriteFlowStatistics() {
-        ArgumentCaptor<LogicalDatastoreType> dataStoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class);
-        ArgumentCaptor<InstanceIdentifier> flowPath = ArgumentCaptor.forClass(InstanceIdentifier.class);
-        ArgumentCaptor<Flow> flow = ArgumentCaptor.forClass(Flow.class);
+        final ArgumentCaptor<LogicalDatastoreType> dataStoreType = ArgumentCaptor.forClass(LogicalDatastoreType.class);
+        final ArgumentCaptor<InstanceIdentifier> flowPath = ArgumentCaptor.forClass(InstanceIdentifier.class);
+        final ArgumentCaptor<Flow> flow = ArgumentCaptor.forClass(Flow.class);
 
         StatisticsGatheringUtils.writeFlowStatistics(prepareFlowStatisticsData(), deviceContext);
 
         Mockito.verify(deviceContext).writeToTransaction(
                 dataStoreType.capture(), flowPath.capture(), flow.capture());
         Assert.assertEquals(LogicalDatastoreType.OPERATIONAL, dataStoreType.getValue());
-        InstanceIdentifier<FlowCapableNode> flowCapableNodePath = flowPath.getValue();
+        final InstanceIdentifier<FlowCapableNode> flowCapableNodePath = flowPath.getValue();
         Assert.assertEquals(DUMMY_NODE_ID, flowCapableNodePath.firstKeyOf(Node.class, NodeKey.class).getId());
         Assert.assertEquals(42, flow.getValue().getTableId().intValue());
     }
 
     private Iterable<FlowsStatisticsUpdate> prepareFlowStatisticsData() {
-        FlowAndStatisticsMapListBuilder flowAndStatsMapListBld = new FlowAndStatisticsMapListBuilder();
+        final FlowAndStatisticsMapListBuilder flowAndStatsMapListBld = new FlowAndStatisticsMapListBuilder();
         flowAndStatsMapListBld.setTableId((short) 42);
         flowAndStatsMapListBld.setMatch(new MatchBuilder().build());
 
-        FlowsStatisticsUpdateBuilder flowStatsUpdateBld1 = new FlowsStatisticsUpdateBuilder();
+        final FlowsStatisticsUpdateBuilder flowStatsUpdateBld1 = new FlowsStatisticsUpdateBuilder();
         flowStatsUpdateBld1.setFlowAndStatisticsMapList(Lists.newArrayList(flowAndStatsMapListBld.build()));
 
         return Lists.newArrayList(flowStatsUpdateBld1.build());
@@ -218,10 +217,10 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_group() throws Exception {
-        MultipartType type = MultipartType.OFPMPGROUP;
+        final MultipartType type = MultipartType.OFPMPGROUP;
         final long groupIdValue = 19L;
 
-        GroupStatsBuilder groupStatsBld = new GroupStatsBuilder()
+        final GroupStatsBuilder groupStatsBld = new GroupStatsBuilder()
                 .setBucketStats(Lists.newArrayList(createBucketStat(21L, 42L)))
                 .setByteCount(BigInteger.valueOf(84L))
                 .setPacketCount(BigInteger.valueOf(63L))
@@ -229,13 +228,13 @@ public class StatisticsGatheringUtilsTest {
                 .setDurationNsec(12L)
                 .setRefCount(13L)
                 .setGroupId(new GroupId(groupIdValue));
-        MultipartReplyGroupBuilder mpReplyGroupBld = new MultipartReplyGroupBuilder();
+        final MultipartReplyGroupBuilder mpReplyGroupBld = new MultipartReplyGroupBuilder();
         mpReplyGroupBld.setGroupStats(Lists.newArrayList(groupStatsBld.build()));
-        MultipartReplyGroupCaseBuilder mpReplyGroupCaseBld = new MultipartReplyGroupCaseBuilder();
+        final MultipartReplyGroupCaseBuilder mpReplyGroupCaseBld = new MultipartReplyGroupCaseBuilder();
         mpReplyGroupCaseBld.setMultipartReplyGroup(mpReplyGroupBld.build());
 
-        MultipartReply groupStatsUpdated = assembleMPReplyMessage(type, mpReplyGroupCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(groupStatsUpdated);
+        final MultipartReply groupStatsUpdated = assembleMPReplyMessage(type, mpReplyGroupCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(groupStatsUpdated);
 
         fireAndCheck(type, statsData);
 
@@ -249,27 +248,27 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_groupDesc() throws Exception {
-        MultipartType type = MultipartType.OFPMPGROUPDESC;
+        final MultipartType type = MultipartType.OFPMPGROUPDESC;
         final long groupIdValue = 27L;
 
-        BucketsListBuilder bucketsListBld = new BucketsListBuilder()
+        final BucketsListBuilder bucketsListBld = new BucketsListBuilder()
                 .setWatchPort(new PortNumber(5L));
-        GroupDescBuilder groupStatsBld = new GroupDescBuilder()
+        final GroupDescBuilder groupStatsBld = new GroupDescBuilder()
                 .setBucketsList(Lists.newArrayList(bucketsListBld.build()))
                 .setGroupId(new GroupId(groupIdValue))
                 .setType(GroupType.OFPGTALL);
-        MultipartReplyGroupDescBuilder mpReplyGroupBld = new MultipartReplyGroupDescBuilder();
+        final MultipartReplyGroupDescBuilder mpReplyGroupBld = new MultipartReplyGroupDescBuilder();
         mpReplyGroupBld.setGroupDesc(Lists.newArrayList(groupStatsBld.build()));
-        MultipartReplyGroupDescCaseBuilder mpReplyGroupCaseBld = new MultipartReplyGroupDescCaseBuilder();
+        final MultipartReplyGroupDescCaseBuilder mpReplyGroupCaseBld = new MultipartReplyGroupDescCaseBuilder();
         mpReplyGroupCaseBld.setMultipartReplyGroupDesc(mpReplyGroupBld.build());
 
-        MultipartReply groupStatsUpdated = assembleMPReplyMessage(type, mpReplyGroupCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(groupStatsUpdated);
+        final MultipartReply groupStatsUpdated = assembleMPReplyMessage(type, mpReplyGroupCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(groupStatsUpdated);
 
         fireAndCheck(type, statsData);
 
-        org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId storedGroupId = new org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId(groupIdValue);
-        KeyedInstanceIdentifier<Group, GroupKey> groupPath = dummyNodePath.augmentation(FlowCapableNode.class).child(Group.class, new GroupKey(storedGroupId));
+        final org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId storedGroupId = new org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId(groupIdValue);
+        final KeyedInstanceIdentifier<Group, GroupKey> groupPath = dummyNodePath.augmentation(FlowCapableNode.class).child(Group.class, new GroupKey(storedGroupId));
 
         verify(deviceContext, Mockito.never()).addDeleteToTxChain(Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.any(InstanceIdentifier.class));
         verify(deviceGroupRegistry).removeMarked();
@@ -280,13 +279,13 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_meter() throws Exception {
-        MultipartType type = MultipartType.OFPMPMETER;
+        final MultipartType type = MultipartType.OFPMPMETER;
         final long meterIdValue = 19L;
 
-        MeterBandStatsBuilder meterBandStatsBld = new MeterBandStatsBuilder()
+        final MeterBandStatsBuilder meterBandStatsBld = new MeterBandStatsBuilder()
                 .setByteBandCount(BigInteger.valueOf(91L))
                 .setPacketBandCount(BigInteger.valueOf(92L));
-        MeterStatsBuilder meterStatsBld = new MeterStatsBuilder()
+        final MeterStatsBuilder meterStatsBld = new MeterStatsBuilder()
                 .setMeterId(new MeterId(meterIdValue))
                 .setByteInCount(BigInteger.valueOf(111L))
                 .setDurationSec(112L)
@@ -294,17 +293,17 @@ public class StatisticsGatheringUtilsTest {
                 .setFlowCount(114L)
                 .setPacketInCount(BigInteger.valueOf(115L))
                 .setMeterBandStats(Lists.newArrayList(meterBandStatsBld.build()));
-        MultipartReplyMeterBuilder mpReplyMeterBld = new MultipartReplyMeterBuilder();
+        final MultipartReplyMeterBuilder mpReplyMeterBld = new MultipartReplyMeterBuilder();
         mpReplyMeterBld.setMeterStats(Lists.newArrayList(meterStatsBld.build()));
-        MultipartReplyMeterCaseBuilder mpReplyMeterCaseBld = new MultipartReplyMeterCaseBuilder();
+        final MultipartReplyMeterCaseBuilder mpReplyMeterCaseBld = new MultipartReplyMeterCaseBuilder();
         mpReplyMeterCaseBld.setMultipartReplyMeter(mpReplyMeterBld.build());
 
-        MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyMeterCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
+        final MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyMeterCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
 
         fireAndCheck(type, statsData);
 
-        InstanceIdentifier<MeterStatistics> meterPath = dummyNodePath.augmentation(FlowCapableNode.class)
+        final InstanceIdentifier<MeterStatistics> meterPath = dummyNodePath.augmentation(FlowCapableNode.class)
                 .child(Meter.class, new MeterKey(new org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId(meterIdValue)))
                 .augmentation(NodeMeterStatistics.class)
                 .child(MeterStatistics.class);
@@ -314,21 +313,21 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_nodeConnector() throws Exception {
-        MultipartType type = MultipartType.OFPMPPORTSTATS;
+        final MultipartType type = MultipartType.OFPMPPORTSTATS;
 
-        PortStatsBuilder portStatsBld = new PortStatsBuilder()
+        final PortStatsBuilder portStatsBld = new PortStatsBuilder()
                 .setPortNo(11L);
-        MultipartReplyPortStatsBuilder mpReplyMeterBld = new MultipartReplyPortStatsBuilder();
+        final MultipartReplyPortStatsBuilder mpReplyMeterBld = new MultipartReplyPortStatsBuilder();
         mpReplyMeterBld.setPortStats(Lists.newArrayList(portStatsBld.build()));
-        MultipartReplyPortStatsCaseBuilder mpReplyMeterCaseBld = new MultipartReplyPortStatsCaseBuilder();
+        final MultipartReplyPortStatsCaseBuilder mpReplyMeterCaseBld = new MultipartReplyPortStatsCaseBuilder();
         mpReplyMeterCaseBld.setMultipartReplyPortStats(mpReplyMeterBld.build());
 
-        MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyMeterCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
+        final MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyMeterCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
 
         fireAndCheck(type, statsData);
 
-        InstanceIdentifier<FlowCapableNodeConnectorStatistics> portPath = dummyNodePath
+        final InstanceIdentifier<FlowCapableNodeConnectorStatistics> portPath = dummyNodePath
                 .child(NodeConnector.class, new NodeConnectorKey(new NodeConnectorId("openflow:" + DUMMY_NODE_ID_VALUE + ":11")))
                 .augmentation(FlowCapableNodeConnectorStatisticsData.class)
                 .child(FlowCapableNodeConnectorStatistics.class);
@@ -340,24 +339,24 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_table() throws Exception {
-        MultipartType type = MultipartType.OFPMPTABLE;
+        final MultipartType type = MultipartType.OFPMPTABLE;
 
-        TableStatsBuilder tableStatsBld = new TableStatsBuilder()
+        final TableStatsBuilder tableStatsBld = new TableStatsBuilder()
                 .setActiveCount(33L)
                 .setLookupCount(BigInteger.valueOf(34L))
                 .setMatchedCount(BigInteger.valueOf(35L))
                 .setTableId((short) 0);
-        MultipartReplyTableBuilder mpReplyTableBld = new MultipartReplyTableBuilder();
+        final MultipartReplyTableBuilder mpReplyTableBld = new MultipartReplyTableBuilder();
         mpReplyTableBld.setTableStats(Lists.newArrayList(tableStatsBld.build()));
-        MultipartReplyTableCaseBuilder mpReplyTableCaseBld = new MultipartReplyTableCaseBuilder();
+        final MultipartReplyTableCaseBuilder mpReplyTableCaseBld = new MultipartReplyTableCaseBuilder();
         mpReplyTableCaseBld.setMultipartReplyTable(mpReplyTableBld.build());
 
-        MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyTableCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
+        final MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyTableCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
 
         fireAndCheck(type, statsData);
 
-        InstanceIdentifier<FlowTableStatistics> tablePath = dummyNodePath
+        final InstanceIdentifier<FlowTableStatistics> tablePath = dummyNodePath
                 .augmentation(FlowCapableNode.class)
                 .child(Table.class, new TableKey((short) 0))
                 .augmentation(FlowTableStatisticsData.class)
@@ -370,10 +369,10 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_queue() throws Exception {
-        MultipartType type = MultipartType.OFPMPQUEUE;
+        final MultipartType type = MultipartType.OFPMPQUEUE;
 
-        long queueIdValue = 4L;
-        QueueStatsBuilder queueStatsBld = new QueueStatsBuilder()
+        final long queueIdValue = 4L;
+        final QueueStatsBuilder queueStatsBld = new QueueStatsBuilder()
                 .setPortNo(11L)
                 .setTxBytes(BigInteger.valueOf(44L))
                 .setTxErrors(BigInteger.valueOf(45L))
@@ -382,17 +381,17 @@ public class StatisticsGatheringUtilsTest {
                 .setDurationNsec(48L)
                 .setQueueId(queueIdValue);
 
-        MultipartReplyQueueBuilder mpReplyQueueBld = new MultipartReplyQueueBuilder();
+        final MultipartReplyQueueBuilder mpReplyQueueBld = new MultipartReplyQueueBuilder();
         mpReplyQueueBld.setQueueStats(Lists.newArrayList(queueStatsBld.build()));
-        MultipartReplyQueueCaseBuilder mpReplyQueueCaseBld = new MultipartReplyQueueCaseBuilder();
+        final MultipartReplyQueueCaseBuilder mpReplyQueueCaseBld = new MultipartReplyQueueCaseBuilder();
         mpReplyQueueCaseBld.setMultipartReplyQueue(mpReplyQueueBld.build());
 
-        MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyQueueCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
+        final MultipartReply meterStatsUpdated = assembleMPReplyMessage(type, mpReplyQueueCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(meterStatsUpdated);
 
         fireAndCheck(type, statsData);
 
-        KeyedInstanceIdentifier<Queue, QueueKey> queuePath = dummyNodePath
+        final KeyedInstanceIdentifier<Queue, QueueKey> queuePath = dummyNodePath
                 .child(NodeConnector.class, new NodeConnectorKey(new NodeConnectorId("openflow:" + DUMMY_NODE_ID_VALUE + ":11")))
                 .augmentation(FlowCapableNodeConnector.class)
                 .child(Queue.class, new QueueKey(new QueueId(queueIdValue)));
@@ -404,14 +403,14 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_flow() throws Exception {
-        MultipartType type = MultipartType.OFPMPFLOW;
+        final MultipartType type = MultipartType.OFPMPFLOW;
         when(deviceFlowRegistry.storeIfNecessary(Matchers.any(FlowRegistryKey.class), Matchers.anyShort()))
                 .thenReturn(new FlowId("openflow:21"));
 
-        org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.match.grouping.MatchBuilder matchBld =
+        final org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.match.grouping.MatchBuilder matchBld =
                 new org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.match.grouping.MatchBuilder()
                         .setMatchEntry(Collections.<MatchEntry>emptyList());
-        FlowStatsBuilder flowStatsBld = new FlowStatsBuilder()
+        final FlowStatsBuilder flowStatsBld = new FlowStatsBuilder()
                 .setByteCount(BigInteger.valueOf(55L))
                 .setPacketCount(BigInteger.valueOf(56L))
                 .setDurationSec(57L)
@@ -420,19 +419,19 @@ public class StatisticsGatheringUtilsTest {
                 .setMatch(matchBld.build())
                 .setFlags(new FlowModFlags(true, false, false, false, true));
 
-        MultipartReplyFlowBuilder mpReplyFlowBld = new MultipartReplyFlowBuilder();
+        final MultipartReplyFlowBuilder mpReplyFlowBld = new MultipartReplyFlowBuilder();
         mpReplyFlowBld.setFlowStats(Lists.newArrayList(flowStatsBld.build()));
-        MultipartReplyFlowCaseBuilder mpReplyFlowCaseBld = new MultipartReplyFlowCaseBuilder();
+        final MultipartReplyFlowCaseBuilder mpReplyFlowCaseBld = new MultipartReplyFlowCaseBuilder();
         mpReplyFlowCaseBld.setMultipartReplyFlow(mpReplyFlowBld.build());
 
-        MultipartReply flowStatsUpdated = assembleMPReplyMessage(type, mpReplyFlowCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(flowStatsUpdated);
+        final MultipartReply flowStatsUpdated = assembleMPReplyMessage(type, mpReplyFlowCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(flowStatsUpdated);
         fireAndCheck(type, statsData);
 
-        FlowBuilder flowBld = new FlowBuilder()
+        final FlowBuilder flowBld = new FlowBuilder()
                 .setTableId((short) 0)
                 .setMatch(new MatchBuilder().build());
-        KeyedInstanceIdentifier<Flow, FlowKey> flowPath = dummyNodePath.augmentation(FlowCapableNode.class)
+        final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = dummyNodePath.augmentation(FlowCapableNode.class)
                 .child(Table.class, new TableKey((short) 0))
                 .child(Flow.class, new FlowKey(new FlowId("openflow:21")));
         verify(deviceContext, Mockito.never()).addDeleteToTxChain(Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.any(InstanceIdentifier.class));
@@ -442,43 +441,43 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testGatherStatistics_meterConfig() throws Exception {
-        MultipartType type = MultipartType.OFPMPMETERCONFIG;
+        final MultipartType type = MultipartType.OFPMPMETERCONFIG;
         final Long meterIdValue = 55L;
 
-        MeterConfigBuilder meterConfigBld = new MeterConfigBuilder()
+        final MeterConfigBuilder meterConfigBld = new MeterConfigBuilder()
                 .setMeterId(new MeterId(meterIdValue))
                 .setFlags(new MeterFlags(false, true, false, true))
                 .setBands(Collections.<Bands>emptyList());
 
-        MultipartReplyMeterConfigBuilder mpReplyMeterConfigBld = new MultipartReplyMeterConfigBuilder();
+        final MultipartReplyMeterConfigBuilder mpReplyMeterConfigBld = new MultipartReplyMeterConfigBuilder();
         mpReplyMeterConfigBld.setMeterConfig(Lists.newArrayList(meterConfigBld.build()));
-        MultipartReplyMeterConfigCaseBuilder mpReplyMeterConfigCaseBld = new MultipartReplyMeterConfigCaseBuilder();
+        final MultipartReplyMeterConfigCaseBuilder mpReplyMeterConfigCaseBld = new MultipartReplyMeterConfigCaseBuilder();
         mpReplyMeterConfigCaseBld.setMultipartReplyMeterConfig(mpReplyMeterConfigBld.build());
 
-        MultipartReply meterConfigUpdated = assembleMPReplyMessage(type, mpReplyMeterConfigCaseBld.build());
-        List<MultipartReply> statsData = Collections.singletonList(meterConfigUpdated);
+        final MultipartReply meterConfigUpdated = assembleMPReplyMessage(type, mpReplyMeterConfigCaseBld.build());
+        final List<MultipartReply> statsData = Collections.singletonList(meterConfigUpdated);
 
         fireAndCheck(type, statsData);
 
         final org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId meterId =
                 new org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId(meterIdValue);
-        KeyedInstanceIdentifier<Meter, MeterKey> meterPath = dummyNodePath.augmentation(FlowCapableNode.class)
+        final KeyedInstanceIdentifier<Meter, MeterKey> meterPath = dummyNodePath.augmentation(FlowCapableNode.class)
                 .child(Meter.class, new MeterKey(meterId));
         verify(deviceContext, Mockito.never()).addDeleteToTxChain(Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.any(InstanceIdentifier.class));
         verify(deviceMeterRegistry).store(meterId);
         verify(deviceContext).writeToTransaction(Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(meterPath), Matchers.any(Meter.class));
     }
 
-    private void fireAndCheck(MultipartType type, List<MultipartReply> statsData) throws InterruptedException, ExecutionException, TimeoutException {
+    private void fireAndCheck(final MultipartType type, final List<MultipartReply> statsData) throws InterruptedException, ExecutionException, TimeoutException {
         when(statisticsService.getStatisticsOfType(Matchers.any(EventIdentifier.class), Matchers.eq(type)))
                 .thenReturn(Futures.immediateFuture(RpcResultBuilder.success(statsData).build()));
 
-        ListenableFuture<Boolean> gatherStatisticsResult = StatisticsGatheringUtils.gatherStatistics(statisticsService, deviceContext, type);
+        final ListenableFuture<Boolean> gatherStatisticsResult = StatisticsGatheringUtils.gatherStatistics(statisticsService, deviceContext, type);
         Assert.assertTrue(gatherStatisticsResult.get(1, TimeUnit.SECONDS).booleanValue());
         verify(deviceContext).submitTransaction();
     }
 
-    private static MultipartReplyMessage assembleMPReplyMessage(MultipartType type, MultipartReplyBody mpReplyGroupCaseBld) {
+    private static MultipartReplyMessage assembleMPReplyMessage(final MultipartType type, final MultipartReplyBody mpReplyGroupCaseBld) {
         return new MultipartReplyMessageBuilder()
                 .setMultipartReplyBody(mpReplyGroupCaseBld)
                 .setType(type)
@@ -500,19 +499,21 @@ public class StatisticsGatheringUtilsTest {
 
     @Test
     public void testDeleteAllKnownFlows() throws Exception {
+        final short tableId = 0;
         when(deviceState.deviceSynchronized()).thenReturn(true);
-        when(features.getTables()).thenReturn((short) 1);
-        KeyedInstanceIdentifier<Table, TableKey> tablePath = deviceState.getNodeInstanceIdentifier()
-                .augmentation(FlowCapableNode.class)
-                .child(Table.class, new TableKey((short) 0));
+        final InstanceIdentifier<FlowCapableNode> nodePath = deviceState.getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
+        final TableBuilder tableDataBld = new TableBuilder();
+        tableDataBld.setId(tableId);
+        final FlowCapableNodeBuilder flowNodeBuilder = new FlowCapableNodeBuilder();
+        flowNodeBuilder.setTable(Collections.singletonList(tableDataBld.build()));
+        final Optional<FlowCapableNode> flowNodeOpt = Optional.of(flowNodeBuilder.build());
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowNodeFuture = Futures.immediateCheckedFuture(flowNodeOpt);
+        when(readTx.read(LogicalDatastoreType.OPERATIONAL, nodePath)).thenReturn(flowNodeFuture);
+        final KeyedInstanceIdentifier<Table, TableKey> tablePath = deviceState.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId));
 
-        TableBuilder tableDataBld = new TableBuilder();
-        Optional<Table> tableDataOpt = Optional.of(tableDataBld.build());
-        CheckedFuture<Optional<Table>, ReadFailedException> tableDataFuture = Futures.immediateCheckedFuture(tableDataOpt);
-        when(readTx.read(LogicalDatastoreType.OPERATIONAL, tablePath)).thenReturn(tableDataFuture);
         StatisticsGatheringUtils.deleteAllKnownFlows(deviceContext);
 
-
         verify(deviceContext).writeToTransaction(
                 LogicalDatastoreType.OPERATIONAL,
                 tablePath,