BUG-8403: guard against ConcurrentModificationException 38/57438/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 15:21:42 +0000 (17:21 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 15:39:38 +0000 (17:39 +0200)
Using TransmitQueue.asIterable() offers slight advantage of not
dealing with a big list, but exposes us to the risk of the Iterable
being changed.

The point missed by the fix to BUG 8491 is that there is an avenue
for the old connection to be touched during replay, as we are
completing entries, for example reads when we are switching from
remote to local connection. In this case the callback will be invoked
in the actor thread, with all the locks being reentrant and held,
hence it can break through to the old connection's queue.

If that happens we will see a ConcurrentModificationException and
enter a buggy territory, where the client fails to work properly.

Document this caveat and turn asIterable() into drain(), which
removes all the entries in the queue, allowing new entries to be
enqueued. The late-comer entries are accounted for when we set the
forwarder.

Change-Id: Idf29c1e565e12aaed917ac94c21c552daf169d4d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java

index 21e5f67a74cd1491427e2622d47315fc63ad1ae1..cd81a4e5444ca9b47a1f51a94ff79796afa106a2 100644 (file)
@@ -13,7 +13,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -158,28 +158,20 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     public abstract Optional<T> getBackendInfo();
 
 
     public abstract Optional<T> getBackendInfo();
 
-    final Iterable<ConnectionEntry> startReplay() {
+    final Collection<ConnectionEntry> startReplay() {
         lock.lock();
         lock.lock();
-        return queue.asIterable();
+        return queue.drain();
     }
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
     }
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder);
+        setForwarder(forwarder);
         lock.unlock();
     }
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
         lock.unlock();
     }
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        final long now = currentTime();
-        final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
-        while (it.hasNext()) {
-            final ConnectionEntry e = it.next();
-            forwarder.forwardEntry(e, now);
-            it.remove();
-        }
-
-        queue.setForwarder(forwarder);
+        queue.setForwarder(forwarder, currentTime());
     }
 
     @GuardedBy("lock")
     }
 
     @GuardedBy("lock")
index 9a459eee78bfa90bde70d885e5ef7180194616d7..8f71edb6ca2e36c2a7a006a8e3b462a60f96aa94 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.access.client;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -54,7 +55,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
          * @param enqueuedEntries Previously-enqueued entries
          * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
          */
          * @param enqueuedEntries Previously-enqueued entries
          * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
          */
-        @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable<ConnectionEntry> enqueuedEntries);
+        @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection<ConnectionEntry> enqueuedEntries);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
@@ -286,7 +287,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
 
             // Lock the old connection and get a reference to its entries
             final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
 
             // Lock the old connection and get a reference to its entries
-            final Iterable<ConnectionEntry> replayIterable = conn.startReplay();
+            final Collection<ConnectionEntry> replayIterable = conn.startReplay();
 
             // Finish the connection attempt
             final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
 
             // Finish the connection attempt
             final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
index 28902d28ba2142f59af6dbc055a4933c2eb76b3a..b7543410cd1a63128ac2da7ffde001cc9b3d778f 100644 (file)
@@ -11,9 +11,9 @@ import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
@@ -106,8 +106,20 @@ abstract class TransmitQueue {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
-    final Iterable<ConnectionEntry> asIterable() {
-        return Iterables.concat(inflight, pending);
+    /**
+     * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+     * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+     * returns and the successor is set will be replayed to the successor.
+     *
+     * @return Collection of entries present in the queue.
+     */
+    final Collection<ConnectionEntry> drain() {
+        final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+        ret.addAll(inflight);
+        ret.addAll(pending);
+        inflight.clear();
+        pending.clear();
+        return ret;
     }
 
     final long ticksStalling(final long now) {
     }
 
     final long ticksStalling(final long now) {
@@ -232,13 +244,32 @@ abstract class TransmitQueue {
         poisonQueue(pending, cause);
     }
 
         poisonQueue(pending, cause);
     }
 
-    final void setForwarder(final ReconnectForwarder forwarder) {
+    final void setForwarder(final ReconnectForwarder forwarder, final long now) {
         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
-        Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
-        Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
-
         successor = Preconditions.checkNotNull(forwarder);
         successor = Preconditions.checkNotNull(forwarder);
-        LOG.debug("Connection {} superseded by {}", this, successor);
+        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+        /*
+         * We need to account for entries which have been added between the time drain() was called and this method
+         * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+         * replay thread, there was an avenue for this to happen.
+         */
+        int count = 0;
+        ConnectionEntry entry = inflight.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = inflight.poll();
+            count++;
+        }
+
+        entry = pending.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = pending.poll();
+            count++;
+        }
+
+        LOG.debug("Connection {} queue spliced {} messages", this, count);
     }
 
     final void remove(final long now) {
     }
 
     final void remove(final long now) {
index 8e07804633440af79c1ec4ff42fa899617141255..b5f1bdac7e6a01f9d0d8eeb18d53609998b1b958 100644 (file)
@@ -17,7 +17,7 @@ import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
 import com.google.common.base.Ticker;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
 import com.google.common.base.Ticker;
-import com.google.common.collect.Iterables;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
@@ -74,15 +74,15 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
 
     @Test
     public void testAsIterable() throws Exception {
 
     @Test
     public void testAsIterable() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
             queue.enqueue(new ConnectionEntry(request, callback, now), now);
         }
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
             queue.enqueue(new ConnectionEntry(request, callback, now), now);
         }
-        final Iterable<ConnectionEntry> entries = queue.asIterable();
-        Assert.assertEquals(sentMessages, Iterables.size(entries));
+        final Collection<ConnectionEntry> entries = queue.drain();
+        Assert.assertEquals(sentMessages, entries.size());
         Assert.assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
         Assert.assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
@@ -97,7 +97,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final long requestSequence = 0L;
         final long txSequence = 0L;
         final long sessionId = 0L;
         final long requestSequence = 0L;
         final long txSequence = 0L;
         final long sessionId = 0L;
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
@@ -133,7 +133,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
     @Test
     public void testIsEmpty() throws Exception {
         Assert.assertTrue(queue.isEmpty());
     @Test
     public void testIsEmpty() throws Exception {
         Assert.assertTrue(queue.isEmpty());
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
@@ -142,8 +142,8 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
 
     @Test
     public void testPeek() throws Exception {
 
     @Test
     public void testPeek() throws Exception {
-        final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
-        final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
@@ -155,7 +155,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
 
     @Test
     public void testPoison() throws Exception {
 
     @Test
     public void testPoison() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
index 752c12771a67bb755c2063b2a16a6b10c2f5a48f..6dd38ce2137d55fbda4ed23bb5035162f9c24346 100644 (file)
@@ -19,7 +19,6 @@ import static org.opendaylight.controller.cluster.access.client.ConnectionEntryM
 import com.google.common.base.Ticker;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.base.Ticker;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.testing.FakeTicker;
 import java.util.Arrays;
 import java.util.Collection;
 import com.google.common.testing.FakeTicker;
 import java.util.Arrays;
 import java.util.Collection;
@@ -111,8 +110,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
             probe.expectMsgClass(RequestEnvelope.class);
         }
         probe.expectNoMsg();
             probe.expectMsgClass(RequestEnvelope.class);
         }
         probe.expectNoMsg();
-        final Iterable<ConnectionEntry> entries = queue.asIterable();
-        assertEquals(sentMessages, Iterables.size(entries));
+        final Collection<ConnectionEntry> entries = queue.drain();
+        assertEquals(sentMessages, entries.size());
         assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
         assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
@@ -143,7 +142,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
-        queue.setForwarder(forwarder);
+        queue.setForwarder(forwarder, ticker.read());
         final long secondEnqueueNow = ticker.read();
         queue.enqueue(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
         final long secondEnqueueNow = ticker.read();
         queue.enqueue(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
index dae12af731846a278386dcf75f8348e61eb30752..1e8d03a8eccfc96594c4d06ca4c050fb736b9f8d 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -306,7 +307,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id
             }
 
             @Override
             }
 
             @Override
-            void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+            void replayRequests(final Collection<ConnectionEntry> previousEntries) {
                 proxy.replayRequests(previousEntries);
             }
 
                 proxy.replayRequests(previousEntries);
             }
 
index f1939dc846ce74c1e5e69fdb4e03265f14b25904..b5fcca64ce0b61a8cfe626d6c2e88bb57d45ff83 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.Collection;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 
 /**
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 
 /**
@@ -18,7 +19,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 abstract class HistoryReconnectCohort implements AutoCloseable {
     abstract ProxyReconnectCohort getProxy();
 
 abstract class HistoryReconnectCohort implements AutoCloseable {
     abstract ProxyReconnectCohort getProxy();
 
-    abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+    abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
 
     @Override
     public abstract void close();
 
     @Override
     public abstract void close();
index 802d9ed0b33bc3a1fb5716a62d84bf03ffd266d3..7c3b2010c2ce809307646f852ee473e9d6c9cbf4 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -212,7 +213,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
         @GuardedBy("lock")
         @Override
 
         @GuardedBy("lock")
         @Override
-        void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+        void replayRequests(final Collection<ConnectionEntry> previousEntries) {
             // First look for our Create message
             Iterator<ConnectionEntry> it = previousEntries.iterator();
             while (it.hasNext()) {
             // First look for our Create message
             Iterator<ConnectionEntry> it = previousEntries.iterator();
             while (it.hasNext()) {
index 0ad4dc53364d97b68ccf99ee02f7c21bd7e21fdd..14bf0645f77c0a5300a3440d0f79dea55e438da5 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
@@ -18,7 +19,7 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
 
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
 
-    abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+    abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
 
     abstract ProxyHistory finishReconnect();
 
 
     abstract ProxyHistory finishReconnect();