Bug 2945: Fix read failures in StatAbstractListenCommit 84/17684/3
authorTom Pantelis <tpanteli@brocade.com>
Thu, 2 Apr 2015 19:53:37 +0000 (15:53 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 2 Apr 2015 21:43:52 +0000 (17:43 -0400)
Added a volatile currentReadTxStale flag which onDataChanged sets instead of
closing the currentReadTx. readLatestConfiguration checks the
currentReadTxStale flag and, if set, closes the currentReadTx and
creates a new one. This prevents onDataChanged from closing the Tx when
a read is in-flight.

Added a unit test class StatAbstractListenCommitTest with test cases for
the methods that were changed.

I also changed StatListenCommitFlow#initConfigFlows to create an
ArrayList when copying the Flow List as Linkedist incurs more memory
overhead.

Change-Id: I42e5b5d55eeddd62b0ffb002d35d07b664c91861
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
applications/statistics-manager/pom.xml
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommit.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitFlow.java
applications/statistics-manager/src/test/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommitTest.java [new file with mode: 0644]

index 8f626c57e5b614db39ec6e74e84e783b67351dac..0fa2c593d146a011793ff9e176d31467a73fd893 100644 (file)
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 05315108b4718212e4b70050d2826e44771b2e7d..66f5660c7d162a0e90d92cfbed988945ab59f419 100644 (file)
@@ -8,9 +8,10 @@
 
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
@@ -29,9 +30,6 @@ import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
 /**
  * statistics-manager
  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -57,7 +55,8 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
 
     private final DataBroker dataBroker;
 
-    private volatile ReadOnlyTransaction currentReadTx;
+    private ReadOnlyTransaction currentReadTx;
+    private volatile boolean currentReadTxStale;
 
     /* Constructor has to make a registration */
     public StatAbstractListenCommit(final StatisticsManager manager, final DataBroker db,
@@ -80,17 +79,13 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
         Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!");
+
         /*
-         * If we have opened read transaction for configuration data store,
-         * we will close and null it.
+         * If we have opened read transaction for configuration data store, we need to mark it as stale.
          *
          * Latest read transaction will be allocated on another read using readLatestConfiguration
          */
-        if(currentReadTx != null) {
-            final ReadOnlyTransaction previous = currentReadTx;
-            currentReadTx = null;
-            previous.close();
-        }
+        currentReadTxStale = true;
     }
 
     @SuppressWarnings("unchecked")
@@ -134,15 +129,36 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
      * @return
      */
     protected final <K extends DataObject> Optional<K> readLatestConfiguration(final InstanceIdentifier<K> path) {
-        if(currentReadTx == null) {
-             currentReadTx = dataBroker.newReadOnlyTransaction();
-        }
-        try {
-            return currentReadTx.read(LogicalDatastoreType.CONFIGURATION, path).checkedGet();
-        } catch (final ReadFailedException e) {
-            LOG.debug("It wasn't possible to read {} from datastore. Exception: {}", path, e);
-            return Optional.absent();
+        for(int i = 0; i < 2; i++) {
+            boolean localReadTxStale = currentReadTxStale;
+
+            // This non-volatile read piggy backs the volatile currentReadTxStale read above to
+            // ensure visibility in case this method is called across threads (although not concurrently).
+            ReadOnlyTransaction localReadTx = currentReadTx;
+            if(localReadTx == null || localReadTxStale) {
+                if(localReadTx != null) {
+                    localReadTx.close();
+                }
+
+                localReadTx = dataBroker.newReadOnlyTransaction();
+
+                currentReadTx = localReadTx;
+
+                // Note - this volatile write also publishes the non-volatile currentReadTx write above.
+                currentReadTxStale = false;
+            }
+
+            try {
+                return localReadTx.read(LogicalDatastoreType.CONFIGURATION, path).checkedGet();
+            } catch (final ReadFailedException e) {
+                LOG.debug("It wasn't possible to read {} from datastore. Exception: {}", path, e);
+
+                // Loop back and try again with a new Tx.
+                currentReadTxStale = true;
+            }
         }
+
+        return Optional.absent();
     }
 }
 
index 9d8230c61193376810ea66f50a40c9a57d1d75a1..81abddfb0e0501e45942934499a5e2f41513f46b 100644 (file)
@@ -8,16 +8,17 @@
 
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -64,10 +65,6 @@ import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
 /**
  * statistics-manager
  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -340,7 +337,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             else
                             {
                                 LOG.warn("flow hashing hit a duplicate {}. Exception was raised: {}. Enable DEBUG for more detail.",
-                                    flowHashId.getFlowId().toString().substring(0, Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,flowHashId.getFlowId().toString().length())), 
+                                    flowHashId.getFlowId().toString().substring(0, Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,flowHashId.getFlowId().toString().length())),
                                     e.getMessage().substring(0,Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,e.getMessage().length())));
                             }
                         }
@@ -385,7 +382,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             if(localList == null) {
                 configFlows = Collections.emptyList();
             } else {
-                configFlows = new LinkedList<>(localList);
+                configFlows = new ArrayList<>(localList);
             }
         }
 
diff --git a/applications/statistics-manager/src/test/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommitTest.java b/applications/statistics-manager/src/test/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommitTest.java
new file mode 100644 (file)
index 0000000..cbddfbf
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+
+/**
+ * Unit tests for StatAbstractListenCommit.
+ *
+ * @author Thomas Pantelis
+ */
+public class StatAbstractListenCommitTest {
+
+    @Mock
+    private NotificationProviderService mockNotificationProviderService;
+
+    @Mock
+    private StatisticsManager mockStatisticsManager;
+
+    @Mock
+    private DataBroker mockDataBroker;
+
+    @Mock
+    private NotificationListener mockNotificationListener;
+
+    @SuppressWarnings("rawtypes")
+    private StatAbstractListenCommit statCommit;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        statCommit = new StatAbstractListenCommit(mockStatisticsManager, mockDataBroker,
+                mockNotificationProviderService, DataObject.class) {
+            @Override
+            protected InstanceIdentifier getWildCardedRegistrationPath() {
+                return InstanceIdentifier.create(DataObject.class);
+            }
+
+            @Override
+            protected NotificationListener getStatNotificationListener() {
+                return mockNotificationListener;
+            }
+        };
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadLatestConfiguration() {
+
+        InstanceIdentifier<DataObject> path = InstanceIdentifier.create(DataObject.class);
+
+        ReadOnlyTransaction mockReadTx = mock(ReadOnlyTransaction.class);
+        doReturn(mockReadTx).when(mockDataBroker).newReadOnlyTransaction();
+
+        Optional<DataObject> expected = Optional.of(mock(DataObject.class));
+        doReturn(Futures.immediateCheckedFuture(expected)).when(mockReadTx).read(
+                LogicalDatastoreType.CONFIGURATION, path);
+
+        Optional<DataObject> actual = statCommit.readLatestConfiguration(path);
+
+        assertSame("Optional instance", expected, actual);
+
+        actual = statCommit.readLatestConfiguration(path);
+
+        assertSame("Optional instance", expected, actual);
+
+        verify(mockReadTx, never()).close();
+        verify(mockDataBroker).newReadOnlyTransaction();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadLatestConfigurationWithReadFailure() {
+
+        InstanceIdentifier<DataObject> path = InstanceIdentifier.create(DataObject.class);
+
+        ReadOnlyTransaction mockReadTx1 = mock(ReadOnlyTransaction.class);
+        ReadOnlyTransaction mockReadTx2 = mock(ReadOnlyTransaction.class);
+        ReadOnlyTransaction mockReadTx3 = mock(ReadOnlyTransaction.class);
+        doReturn(mockReadTx1).doReturn(mockReadTx2).doReturn(mockReadTx3).when(mockDataBroker).newReadOnlyTransaction();
+
+        doReturn(Futures.immediateFailedCheckedFuture(new ReadFailedException("mock"))).when(mockReadTx1).read(
+                LogicalDatastoreType.CONFIGURATION, path);
+
+        doReturn(Futures.immediateFailedCheckedFuture(new ReadFailedException("mock"))).when(mockReadTx2).read(
+                LogicalDatastoreType.CONFIGURATION, path);
+
+        Optional<DataObject> expected = Optional.of(mock(DataObject.class));
+        doReturn(Futures.immediateCheckedFuture(expected)).when(mockReadTx3).read(
+                LogicalDatastoreType.CONFIGURATION, path);
+
+        Optional<DataObject> actual = statCommit.readLatestConfiguration(path);
+
+        assertEquals("Optional isPresent", false, actual.isPresent());
+
+        actual = statCommit.readLatestConfiguration(path);
+
+        assertSame("Optional instance", expected, actual);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadLatestConfigurationWithInterveningOnDataChanged() {
+
+        InstanceIdentifier<DataObject> path = InstanceIdentifier.create(DataObject.class);
+
+        ReadOnlyTransaction mockReadTx1 = mock(ReadOnlyTransaction.class);
+        ReadOnlyTransaction mockReadTx2 = mock(ReadOnlyTransaction.class);
+        doReturn(mockReadTx1).doReturn(mockReadTx2).when(mockDataBroker).newReadOnlyTransaction();
+
+        final Optional<DataObject> expected1 = Optional.of(mock(DataObject.class));
+        Answer<CheckedFuture<Optional<DataObject>, ReadFailedException>> answer =
+                new Answer<CheckedFuture<Optional<DataObject>, ReadFailedException>>() {
+                    @Override
+                    public CheckedFuture<Optional<DataObject>, ReadFailedException> answer(
+                            InvocationOnMock unused) {
+                        statCommit.onDataChanged(mock(AsyncDataChangeEvent.class));
+                        return Futures.immediateCheckedFuture(expected1);
+                    }
+                };
+
+        doAnswer(answer).when(mockReadTx1).read(LogicalDatastoreType.CONFIGURATION, path);
+
+        Optional<DataObject> expected2 = Optional.of(mock(DataObject.class));
+        doReturn(Futures.immediateCheckedFuture(expected2)).when(mockReadTx2).read(
+                LogicalDatastoreType.CONFIGURATION, path);
+
+        Optional<DataObject> actual = statCommit.readLatestConfiguration(path);
+
+        assertSame("Optional instance", expected1, actual);
+
+        actual = statCommit.readLatestConfiguration(path);
+
+        assertSame("Optional instance", expected2, actual);
+
+        actual = statCommit.readLatestConfiguration(path);
+
+        assertSame("Optional instance", expected2, actual);
+
+        verify(mockReadTx1).close();
+        verify(mockReadTx2, never()).close();
+        verify(mockDataBroker, times(2)).newReadOnlyTransaction();
+    }
+}