BUG-8403: guard against ConcurrentModificationException 69/57569/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 15:21:42 +0000 (17:21 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 18:17:09 +0000 (20:17 +0200)
Using TransmitQueue.asIterable() offers slight advantage of not
dealing with a big list, but exposes us to the risk of the Iterable
being changed.

The point missed by the fix to BUG 8491 is that there is an avenue
for the old connection to be touched during replay, as we are
completing entries, for example reads when we are switching from
remote to local connection. In this case the callback will be invoked
in the actor thread, with all the locks being reentrant and held,
hence it can break through to the old connection's queue.

If that happens we will see a ConcurrentModificationException and
enter a buggy territory, where the client fails to work properly.

Document this caveat and turn asIterable() into drain(), which
removes all the entries in the queue, allowing new entries to be
enqueued. The late-comer entries are accounted for when we set the
forwarder.

Change-Id: Idf29c1e565e12aaed917ac94c21c552daf169d4d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 930747a6ba5d888d2fbe54473132680e4621d858)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java

index 47c0676..da016ba 100644 (file)
@@ -13,7 +13,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -156,28 +156,20 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     public abstract Optional<T> getBackendInfo();
 
-    final Iterable<ConnectionEntry> startReplay() {
+    final Collection<ConnectionEntry> startReplay() {
         lock.lock();
-        return queue.asIterable();
+        return queue.drain();
     }
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder);
+        setForwarder(forwarder);
         lock.unlock();
     }
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        final long now = currentTime();
-        final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
-        while (it.hasNext()) {
-            final ConnectionEntry e = it.next();
-            forwarder.forwardEntry(e, now);
-            it.remove();
-        }
-
-        queue.setForwarder(forwarder);
+        queue.setForwarder(forwarder, currentTime());
     }
 
     @GuardedBy("lock")
index ca78d0c..5d61885 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.access.client;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -54,7 +55,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
          * @param enqueuedEntries Previously-enqueued entries
          * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
          */
-        @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable<ConnectionEntry> enqueuedEntries);
+        @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection<ConnectionEntry> enqueuedEntries);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
@@ -286,7 +287,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
 
             // Lock the old connection and get a reference to its entries
-            final Iterable<ConnectionEntry> replayIterable = conn.startReplay();
+            final Collection<ConnectionEntry> replayIterable = conn.startReplay();
 
             // Finish the connection attempt
             final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
index 28902d2..b754341 100644 (file)
@@ -11,9 +11,9 @@ import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
@@ -106,8 +106,20 @@ abstract class TransmitQueue {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
-    final Iterable<ConnectionEntry> asIterable() {
-        return Iterables.concat(inflight, pending);
+    /**
+     * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+     * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+     * returns and the successor is set will be replayed to the successor.
+     *
+     * @return Collection of entries present in the queue.
+     */
+    final Collection<ConnectionEntry> drain() {
+        final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+        ret.addAll(inflight);
+        ret.addAll(pending);
+        inflight.clear();
+        pending.clear();
+        return ret;
     }
 
     final long ticksStalling(final long now) {
@@ -232,13 +244,32 @@ abstract class TransmitQueue {
         poisonQueue(pending, cause);
     }
 
-    final void setForwarder(final ReconnectForwarder forwarder) {
+    final void setForwarder(final ReconnectForwarder forwarder, final long now) {
         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
-        Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
-        Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
-
         successor = Preconditions.checkNotNull(forwarder);
-        LOG.debug("Connection {} superseded by {}", this, successor);
+        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+        /*
+         * We need to account for entries which have been added between the time drain() was called and this method
+         * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+         * replay thread, there was an avenue for this to happen.
+         */
+        int count = 0;
+        ConnectionEntry entry = inflight.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = inflight.poll();
+            count++;
+        }
+
+        entry = pending.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = pending.poll();
+            count++;
+        }
+
+        LOG.debug("Connection {} queue spliced {} messages", this, count);
     }
 
     final void remove(final long now) {
index 8e07804..b5f1bda 100644 (file)
@@ -17,7 +17,7 @@ import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
 import com.google.common.base.Ticker;
-import com.google.common.collect.Iterables;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
@@ -74,15 +74,15 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
 
     @Test
     public void testAsIterable() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
             queue.enqueue(new ConnectionEntry(request, callback, now), now);
         }
-        final Iterable<ConnectionEntry> entries = queue.asIterable();
-        Assert.assertEquals(sentMessages, Iterables.size(entries));
+        final Collection<ConnectionEntry> entries = queue.drain();
+        Assert.assertEquals(sentMessages, entries.size());
         Assert.assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
@@ -97,7 +97,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final long requestSequence = 0L;
         final long txSequence = 0L;
         final long sessionId = 0L;
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
@@ -133,7 +133,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
     @Test
     public void testIsEmpty() throws Exception {
         Assert.assertTrue(queue.isEmpty());
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
@@ -142,8 +142,8 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
 
     @Test
     public void testPeek() throws Exception {
-        final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
-        final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
@@ -155,7 +155,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
 
     @Test
     public void testPoison() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
index 752c127..6dd38ce 100644 (file)
@@ -19,7 +19,6 @@ import static org.opendaylight.controller.cluster.access.client.ConnectionEntryM
 import com.google.common.base.Ticker;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.testing.FakeTicker;
 import java.util.Arrays;
 import java.util.Collection;
@@ -111,8 +110,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
             probe.expectMsgClass(RequestEnvelope.class);
         }
         probe.expectNoMsg();
-        final Iterable<ConnectionEntry> entries = queue.asIterable();
-        assertEquals(sentMessages, Iterables.size(entries));
+        final Collection<ConnectionEntry> entries = queue.drain();
+        assertEquals(sentMessages, entries.size());
         assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
@@ -143,7 +142,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
-        queue.setForwarder(forwarder);
+        queue.setForwarder(forwarder, ticker.read());
         final long secondEnqueueNow = ticker.read();
         queue.enqueue(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
index dae12af..1e8d03a 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -306,7 +307,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id
             }
 
             @Override
-            void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+            void replayRequests(final Collection<ConnectionEntry> previousEntries) {
                 proxy.replayRequests(previousEntries);
             }
 
index f1939dc..b5fcca6 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.Collection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 
 /**
@@ -18,7 +19,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 abstract class HistoryReconnectCohort implements AutoCloseable {
     abstract ProxyReconnectCohort getProxy();
 
-    abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+    abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
 
     @Override
     public abstract void close();
index 802d9ed..7c3b201 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -212,7 +213,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
-        void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+        void replayRequests(final Collection<ConnectionEntry> previousEntries) {
             // First look for our Create message
             Iterator<ConnectionEntry> it = previousEntries.iterator();
             while (it.hasNext()) {
index 0ad4dc5..14bf064 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
@@ -18,7 +19,7 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
 
-    abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+    abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
 
     abstract ProxyHistory finishReconnect();
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.